// SecondaryNameNode can be started in 2 modes: // 手动执行checkpoint或者获取未执行checkpoint的事务数量 // 1. run a command (i.e. checkpoint or geteditsize) then terminate // 作为一个守护进程,开启InfoServer和CheckpointThread定期执行checkpoint // 2. run as a daemon when {@link #parseArgs} yields no commands if (opts != null && opts.getCommand() != null) { // mode 1 int ret = secondary.processStartupCommand(opts); terminate(ret); } else { // mode 2 secondary.startInfoServer(); secondary.startCheckpointThread(); secondary.join(); }
publicvoiddoWork(){ ... try { // We may have lost our ticket since last checkpoint, log in again, just in case if(UserGroupInformation.isSecurityEnabled()) UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
finallong monotonicNow = Time.monotonicNow(); finallong now = Time.now();
// Tell the namenode to start logging transactions in a new edit file // Returns a token that would be used to upload the merged image. //namenode.rollEditLog() //通知nn进行滚动edit文件,向新文件中写日志,并且返回一个CheckpointSignature对象,该对象中包含了curSegmentTxId和mostRecentCheckpointTxId等信息 //curSegmentTxId是滚动生成的新文件的起始txid //mostRecentCheckpointTxId是nn上的fsimage最后的txid CheckpointSignature sig = namenode.rollEditLog();
boolean loadImage = false; boolean isFreshCheckpointer = (checkpointImage.getNamespaceID() == 0); boolean isSameCluster = (dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES) && sig.isSameCluster(checkpointImage)) || (!dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES) && sig.namespaceIdMatches(checkpointImage)); if (isFreshCheckpointer || (isSameCluster && !sig.storageVersionMatches(checkpointImage.getStorage()))) { // if we're a fresh 2NN, or if we're on the same cluster and our storage // needs an upgrade, just take the storage info from the server. // 如果我们是第一次启动2nn,或者我们的2nn节点 needs an upgrade dstStorage.setStorageInfo(sig); dstStorage.setClusterID(sig.getClusterID()); dstStorage.setBlockPoolID(sig.getBlockpoolID()); loadImage = true; } sig.validateStorageInfo(checkpointImage);
// error simulation code for junit test CheckpointFaultInjector.getInstance().afterSecondaryCallsRollEditLog(); // 获取nn上的元数据目录信息 RemoteEditLogManifest manifest = namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);
// Fetch fsimage and edits. Reload the image if previous merge failed. // 下载fsimage文件和edit文件,这个downloadCheckpointFiles方法中会根据txid进行判断文件顺序是否正确,还有是否需要从nn的fsimage上拉取文件等操作,下面有展开说明 loadImage |= downloadCheckpointFiles( fsName, checkpointImage, sig, manifest) | checkpointImage.hasMergeError(); try {
doMerge(sig, manifest, loadImage, checkpointImage, namesystem); } catch (IOException ioe) { // A merge error occurred. The in-memory file system state may be // inconsistent, so the image and edits need to be reloaded. checkpointImage.setMergeError(); throw ioe; } // Clear any error since merge was successful. checkpointImage.clearMergeError();
// // Upload the new image into the NameNode. Then tell the Namenode // to make this new uploaded image as the most current image. //
// Fsimage.rollEditLog(): nn 滚动生成一个新的edit文件 CheckpointSignature rollEditLog(int layoutVersion)throws IOException { // nn 滚动生成一个新的edits文件 并且txid+1 赋值给curSegmentTxId getEditLog().rollEditLog(layoutVersion); // Record this log segment ID in all of the storage directories, so // we won't miss this log segment on a restart if the edits directories // go missing. storage.writeTransactionIdFileToStorage(getEditLog().getCurSegmentTxId()); //Update NameDirSize Metric getStorage().updateNameDirSize(); //创建并返回一个CheckpointSignature 其中包含了curSegmentTxId和mostRecentCheckpointTxId等信息 //curSegmentTxId是滚动生成的新文件的起始txid //mostRecentCheckpointTxId是nn上的fsimage最后的txid returnnew CheckpointSignature(this); }
// secondary namenode从namenode上下载edits文件和fsimage方法 staticbooleandownloadCheckpointFiles( final URL nnHostPort, final FSImage dstImage, final CheckpointSignature sig, final RemoteEditLogManifest manifest )throws IOException {
// Sanity check manifest - these could happen if, eg, someone on the // NN side accidentally rmed the storage directories if (manifest.getLogs().isEmpty()) { thrownew IOException("Found no edit logs to download on NN since txid " + sig.mostRecentCheckpointTxId); } // nn中的fsimage中最新的txid+1是我们想开始拉取的id long expectedTxId = sig.mostRecentCheckpointTxId + 1;
采取了Paxos协议的JournalNode Cluster保证了高可用性,如果某一个JournalNode挂掉,那么马上JournalNode集群会选举一个新的leader出来,这个机制在Hadoop中被称为QJM(HA using Quorum Journal Manager)架构,也是当前Hadoop默认的HA架构。
Hadoop HA QJM 架构
Active NameNode(ANN):在HDFS集群中,对外提供读写服务的唯一Master节点。ANN将客户端请求过来的写操作通过EditLog写入共享存储系统(即JournalNode Cluster),为Standby NameNode及时同步数据提供支持;
privatevoiddoCheckpoint()throws InterruptedException, IOException { assert canceler != null; finallong txid; final NameNodeFile imageType; // Acquire cpLock to make sure no one is modifying the name system. // It does not need the full namesystem write lock, since the only thing // that modifies namesystem on standby node is edit log replaying.
// 这里获取cplock hdfs有两种锁 一种是fslock会将整个fsSystem锁住,另一种是专门给standby nn 进行ckeckpoint使用的cplock,这个锁不会影响块信息和元数据之间的映射关系,所以不用将整个文件系统锁住,应该只是锁住了active nn中的editlog文件,让他的txid不会再增长。 // Prevent reading of name system while being modified. The full // name system lock will be acquired to further block even the block // state updates. namesystem.cpLockInterruptibly(); try { assert namesystem.getEditLog().isOpenForRead() : "Standby Checkpointer should only attempt a checkpoint when " + "NN is in standby mode, but the edit logs are in an unexpected state";
FSImage img = namesystem.getFSImage();
// 获得上次进行checkpoint生成的fsimage中最大的txID long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId(); // 获得正在写的editlog中最新的txID long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId(); assert thisCheckpointTxId >= prevCheckpointTxId; if (thisCheckpointTxId == prevCheckpointTxId) { LOG.info("A checkpoint was triggered but the Standby Node has not " + "received any transactions since the last checkpoint at txid {}. " + "Skipping...", thisCheckpointTxId); return; }
if (namesystem.isRollingUpgrade() && !namesystem.getFSImage().hasRollbackFSImage()) { // 如果我们是要进行 rolling upgrade 滚动升级,则生成的fsimage叫做“fsimage_rollback”开头 // 就设置imageType为fsimage_rollback否则就设置为正常的"fsimage"开头 imageType = NameNodeFile.IMAGE_ROLLBACK; } else { imageType = NameNodeFile.IMAGE; } // Save the contents of the FS image to a new image file in each of the current storage directories.
// saveNamespace 这个方法将会在指定的目录生成image文件 img.saveNamespace(namesystem, imageType, canceler); txid = img.getStorage().getMostRecentCheckpointTxId(); assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" + thisCheckpointTxId + " but instead saved at txid=" + txid;
// Save the legacy OIV image, if the output dir is defined. String outputDir = checkpointConf.getLegacyOivImageDir(); if (outputDir != null && !outputDir.isEmpty()) { try { img.saveLegacyOIVImage(namesystem, outputDir, canceler); } catch (IOException ioe) { LOG.warn("Exception encountered while saving legacy OIV image; " + "continuing with other checkpointing steps", ioe); } } } finally { namesystem.cpUnlock(); } ...
/** * Save the contents of the FS image to a new image file in each of the * current storage directories. */ publicsynchronizedvoidsaveNamespace(FSNamesystem source, NameNodeFile nnf, Canceler canceler)throws IOException { assert editLog != null : "editLog must be initialized"; LOG.info("Save namespace ..."); storage.attemptRestoreRemovedStorage();
boolean editLogWasOpen = editLog.isSegmentOpen();
if (editLogWasOpen) { editLog.endCurrentLogSegment(true); } // 获得最新写入的txID long imageTxId = getCorrectLastAppliedOrWrittenTxId(); if (!addToCheckpointing(imageTxId)) { thrownew IOException( "FS image is being downloaded from another NN at txid " + imageTxId); } try { try { // 这个方法会将文件全部合并为fsimage到指定目录中 saveFSImageInAllDirs(source, nnf, imageTxId, canceler); if (!source.isRollingUpgrade()) { updateStorageVersion(); } } finally { if (editLogWasOpen) { editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1, source.getEffectiveLayoutVersion()); // Take this opportunity to note the current transaction. // Even if the namespace save was cancelled, this marker // is only used to determine what transaction ID is required // for startup. So, it doesn't hurt to update it unnecessarily. storage.writeTransactionIdFileToStorage(imageTxId + 1); } } } finally { removeFromCheckpointing(imageTxId); } //Update NameDirSize Metric getStorage().updateNameDirSize();
if (exitAfterSave.get()) { LOG.error("NameNode process will exit now... The saved FsImage " + nnf + " is potentially corrupted."); ExitUtil.terminate(-1); } }
/** * @see #saveFSImageInAllDirs(FSNamesystem, NameNodeFile, long, Canceler) */ protectedsynchronizedvoidsaveFSImageInAllDirs(FSNamesystem source, long txid) throws IOException { if (!addToCheckpointing(txid)) { thrownew IOException(("FS image is being downloaded from another NN")); } try { saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null); } finally { removeFromCheckpointing(txid); } }
Post title:HDFS CheckPoint 流程源码分析
Post author:刘梦凯
Create time:2022-10-11 10:14:42
Post link:https://liumengkai.github.io/2022/10/11/HDFS-CheckPoint-流程源码分析/
Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.