HDFS CheckPoint 流程源码分析
刘梦凯 Lv3

Talk is cheap show me the code!

HDFS 在非 HA 模式的集群下,NameNode 和 DataNode 是一个主从的架构。在这样的主从架构之下只有一台 NameNode。一台 NameNode 的好处是无需因为元数据的同步而考虑数据的一致性问题。但是,只有一台 NameNode 也会有很多的坏处,因为,单台 NameNode 的情况下会出现所有单机都会出现的问题。最简单的问题就是,当这一台 NameNode 挂掉后,整个集群将不可用了。

为了解决单台 NameNode 挂掉不可用的问题,HDFS 在 2.x 版本的时候引入了 HDFS 集群的 HA 模式,也就是有了 NameNode 的主备架构。在 2.x 的版本中,HDFS 支持一主一备的架构,在 3.x 的版本中最多支持 5 个,官方推荐使用 3 个。

上面我们知道了Hadoop集群为了解决单点故障问题,在2.x的时候引用了HA架构,之后3.x的时候增加了standy nn的数量最大为5个,在我们的集群HA开启和关闭的两种情况下我们的hdfs checkpoint也是有很大区别的,下面我们分别来从源码角度分析一下。

HA关闭情况下HDFS Checkpoint流程

首先在我们开始了解checkpoint之前,我们需要了解几个关于Hadoop元数据的基础知识

txid

Hadoop为每一次操作都赋予了一个独一无二的transactionId,简称为txid,如何保证txid顺序递增,加锁,在hdfs这种高并发的系统中肯定不能普通的加锁释放操作,性能达不到需求,hdfs如何保证txid顺序递增?

fsimage

fsimage文件是Hadoop系统元数据的永久性检查点,包含了系统中所有文件的目录和文件inode序列化信息

edits

edits日志文件存放hadoop所有操作的日志信息,操作首先会被记录到edits文件中,定时合并为fsimage文件,在没有开启HA的情况下是由secondary nn来进行合并操作,开启HA的情况下是JournalNode节点来进行合并以及同步。

上面三个知识点在了解完毕之后我们来看关闭HA状况下的checkpoint流程。

Secondary NameNode是一个周期性唤醒的守护进程,定时的来触发checkpoint操作,checkpoint条件有两个:

(1)超过了配置的检查点时间时长

(2)没有合并的操作次数超过了配置值(dfs.namenode.checkpoint.txns)默认为100w

打开Hadoop的SecondaryNameNode.java文件,这个文件就是2nn的启动进程,其中的main方法注释写明了在什么条件下会启动checkpoint。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// SecondaryNameNode can be started in 2 modes:
// 手动执行checkpoint或者获取未执行checkpoint的事务数量
// 1. run a command (i.e. checkpoint or geteditsize) then terminate
// 作为一个守护进程,开启InfoServer和CheckpointThread定期执行checkpoint
// 2. run as a daemon when {@link #parseArgs} yields no commands

if (opts != null && opts.getCommand() != null) {
// mode 1
int ret = secondary.processStartupCommand(opts);
terminate(ret);
} else {
// mode 2
secondary.startInfoServer();
secondary.startCheckpointThread();
secondary.join();
}

我们主要讲解第二种情况:

其中startInfoServer()主要功能是建立一个http server和namenode保持通信,而startCheckpointThread()作用是启动一个SecondaryNameNode线程,我们SecondaryNameNode类本身就是实现了Runnable的,所以可以被当做线程启动。

run方法中调用doWork(); 下面是SecondaryNameNode.doWork()主要内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void doWork() {
...
try {
// We may have lost our ticket since last checkpoint, log in again, just in case
if(UserGroupInformation.isSecurityEnabled())
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();

final long monotonicNow = Time.monotonicNow();
final long now = Time.now();

// 根据操作次数和checkpoint间隔时间判断是否进行checkpoint
if (shouldCheckpointBasedOnCount() ||
monotonicNow >= lastCheckpointTime + 1000 * checkpointConf.getPeriod()) {
//checkpoint主要流程
doCheckpoint();
// 上传完毕之后更新2nn对象上的lastcheckpoint时间
lastCheckpointTime = monotonicNow;
lastCheckpointWallclockTime = now;
}
}
...

doCheckpoint方法首先就是调用namenode.rollEditLog()来滚动nn中正在写的edit文件,然后返回一个包含nn的元数据相关信息的CheckpointSignature

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public boolean doCheckpoint() throws IOException {
checkpointImage.ensureCurrentDirExists();
NNStorage dstStorage = checkpointImage.getStorage();

// Tell the namenode to start logging transactions in a new edit file
// Returns a token that would be used to upload the merged image.
//namenode.rollEditLog()

//通知nn进行滚动edit文件,向新文件中写日志,并且返回一个CheckpointSignature对象,该对象中包含了curSegmentTxId和mostRecentCheckpointTxId等信息
//curSegmentTxId是滚动生成的新文件的起始txid
//mostRecentCheckpointTxId是nn上的fsimage最后的txid
CheckpointSignature sig = namenode.rollEditLog();

boolean loadImage = false;
boolean isFreshCheckpointer = (checkpointImage.getNamespaceID() == 0);
boolean isSameCluster =
(dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES)
&& sig.isSameCluster(checkpointImage)) ||
(!dstStorage.versionSupportsFederation(NameNodeLayoutVersion.FEATURES)
&& sig.namespaceIdMatches(checkpointImage));
if (isFreshCheckpointer ||
(isSameCluster &&
!sig.storageVersionMatches(checkpointImage.getStorage()))) {
// if we're a fresh 2NN, or if we're on the same cluster and our storage
// needs an upgrade, just take the storage info from the server.
// 如果我们是第一次启动2nn,或者我们的2nn节点 needs an upgrade
dstStorage.setStorageInfo(sig);
dstStorage.setClusterID(sig.getClusterID());
dstStorage.setBlockPoolID(sig.getBlockpoolID());
loadImage = true;
}
sig.validateStorageInfo(checkpointImage);

// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryCallsRollEditLog();
// 获取nn上的元数据目录信息
RemoteEditLogManifest manifest =
namenode.getEditLogManifest(sig.mostRecentCheckpointTxId + 1);

// Fetch fsimage and edits. Reload the image if previous merge failed.
// 下载fsimage文件和edit文件,这个downloadCheckpointFiles方法中会根据txid进行判断文件顺序是否正确,还有是否需要从nn的fsimage上拉取文件等操作,下面有展开说明
loadImage |= downloadCheckpointFiles(
fsName, checkpointImage, sig, manifest) |
checkpointImage.hasMergeError();
try {

// 将fsimage和edit文件进行合并 如果中间缺少哪个文件可以
// 通过reloadFromImageFile()方法单独从nn上下载

doMerge(sig, manifest, loadImage, checkpointImage, namesystem);
} catch (IOException ioe) {
// A merge error occurred. The in-memory file system state may be
// inconsistent, so the image and edits need to be reloaded.
checkpointImage.setMergeError();
throw ioe;
}
// Clear any error since merge was successful.
checkpointImage.clearMergeError();


//
// Upload the new image into the NameNode. Then tell the Namenode
// to make this new uploaded image as the most current image.
//

// 获得2nn上最新的txid 然后上传最新的fsimage给nn
long txid = checkpointImage.getLastAppliedTxId();
TransferFsImage.uploadImageFromStorage(fsName, conf, dstStorage,
NameNodeFile.IMAGE, txid);

// error simulation code for junit test
CheckpointFaultInjector.getInstance().afterSecondaryUploadsNewImage();

LOG.warn("Checkpoint done. New Image Size: "
+ dstStorage.getFsImageName(txid).length());

if (legacyOivImageDir != null && !legacyOivImageDir.isEmpty()) {
try {
checkpointImage.saveLegacyOIVImage(namesystem, legacyOivImageDir,
new Canceler());
} catch (IOException e) {
LOG.warn("Failed to write legacy OIV image: ", e);
}
}
return loadImage;
}

namenode.rollEditLog()需要注意namenode是interface,调用的实际上是NameNodeRpcServer.rollEditLog()方法和nn交互,滚动edit文件,下面我们来看一下NameNodeRpcServer.rollEditLog()方法主要作用:

1.检查namenode 是否是安全模式 HA有没有开启等 处于安全模式或者HA开启则抛出异常
2.getFSImage().rollEditLog()调用FSImage.rollEditLog()滚动生成一个新的edit文件 返回一个CheckpointSignature对象 其中包含了curSegmentTxId(当前nn正在写的文件的txid) 等等一系列元数据相关信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// namenode.rollEditLog()->NameNodeRpcServer.rollEditLog()->FSNamesystem.rollEditLog():
CheckpointSignature rollEditLog() throws IOException {
String operationName = "rollEditLog";
CheckpointSignature result = null;
checkSuperuserPrivilege(operationName);
checkOperation(OperationCategory.JOURNAL);
writeLock();
try {
checkOperation(OperationCategory.JOURNAL);
// 检查namenode 是否是安全模式 HA有没有开启等
checkNameNodeSafeMode("Log not rolled");
if (Server.isRpcInvocation()) {
LOG.info("Roll Edit Log from " + Server.getRemoteAddress());
}
// 这个方法会滚动edit日志文件
result = getFSImage().rollEditLog(getEffectiveLayoutVersion());
} finally {
writeUnlock(operationName, getLockReportInfoSupplier(null));
}
logAuditEvent(true, operationName, null);
return result;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Fsimage.rollEditLog(): nn 滚动生成一个新的edit文件
CheckpointSignature rollEditLog(int layoutVersion) throws IOException {

// nn 滚动生成一个新的edits文件 并且txid+1 赋值给curSegmentTxId

getEditLog().rollEditLog(layoutVersion);
// Record this log segment ID in all of the storage directories, so
// we won't miss this log segment on a restart if the edits directories
// go missing.
storage.writeTransactionIdFileToStorage(getEditLog().getCurSegmentTxId());
//Update NameDirSize Metric
getStorage().updateNameDirSize();

//创建并返回一个CheckpointSignature 其中包含了curSegmentTxId和mostRecentCheckpointTxId等信息
//curSegmentTxId是滚动生成的新文件的起始txid
//mostRecentCheckpointTxId是nn上的fsimage最后的txid
return new CheckpointSignature(this);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// secondary namenode从namenode上下载edits文件和fsimage方法
static boolean downloadCheckpointFiles(
final URL nnHostPort,
final FSImage dstImage,
final CheckpointSignature sig,
final RemoteEditLogManifest manifest
) throws IOException {

// Sanity check manifest - these could happen if, eg, someone on the
// NN side accidentally rmed the storage directories
if (manifest.getLogs().isEmpty()) {
throw new IOException("Found no edit logs to download on NN since txid "
+ sig.mostRecentCheckpointTxId);
}
// nn中的fsimage中最新的txid+1是我们想开始拉取的id
long expectedTxId = sig.mostRecentCheckpointTxId + 1;

// 检查nn中的第一个edit文件的开始的txId和我们想拉取的expectedTxId是否相匹配

if (manifest.getLogs().get(0).getStartTxId() != expectedTxId) {
throw new IOException("Bad edit log manifest (expected txid = " +
expectedTxId + ": " + manifest);
}

try {
Boolean b = UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction<Boolean>() {

@Override
public Boolean run() throws Exception {
dstImage.getStorage().cTime = sig.cTime;

// 检查nn上fsimage的txid和2nn上fsimage的txid是否相等,相等的话2nn就不用从nn上拉取fsimage文件

if (sig.mostRecentCheckpointTxId ==
dstImage.getStorage().getMostRecentCheckpointTxId()) {
LOG.info("Image has not changed. Will not download image.");
} else {
LOG.info("Image has changed. Downloading updated image from NN.");
MD5Hash downloadedHash = TransferFsImage.downloadImageToStorage(
nnHostPort, sig.mostRecentCheckpointTxId,
dstImage.getStorage(), true, false);
dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE,
sig.mostRecentCheckpointTxId, downloadedHash);
}

// get edits file
// 将所有edits文件下载下来
for (RemoteEditLog log : manifest.getLogs()) {
TransferFsImage.downloadEditsToStorage(
nnHostPort, log, dstImage.getStorage());
}

// true if we haven't loaded all the transactions represented by the
// downloaded fsimage.
// lastAppliedTxId 这个指标是2nn查看自己最新的txid,包括了fsimage和edit文件中最新的
// 上面拉取完成之后2nn上最新的txid和nn上的fsimage中最新的id相比较肯定是要大的 如果是小的话就是文件没有拉下来
return dstImage.getLastAppliedTxId() < sig.mostRecentCheckpointTxId;
}
});
return b.booleanValue();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

HA开启的情况下HDFS的checkpoint流程

如果开启了HA,那么我们的NameNode就分为Active和standby两种情况,如果active nn出现问题那么就需要及时的切换为standby模式下的nn,这就需要active和standby的namenode节点元数据信息完全一致,那么是如何保证一致性的呢?

NameNode存储的元数据有两种,一种是由hdfs client提交命令生成的元数据,比如创建文件夹等操作,另一种是datanode的block信息,而为了保证快速切换,datanode会将自己所有的块信息发送给active namenode 和 standby namenodes,这样需要同步的信息就只剩下了由HDFS client产生的元数据变化,那么这个如何操作呢?下面我们来分析一下有几种方案。

1.同步阻塞式

HDFS client同步提交操作给active namenode之后再由其同步给standby namenode,这样确实可以保证两边的元数据完全一致,但是带来的影响也非常明显,如果standby namenode节点出现问题导致同步没有完成一直等待,超时后告诉用户是因为备用节点出现问题,备用节点出现问题导致可用性降低这个肯定是我们不能接受的现象,那么另一种方法呢?

2.异步阻塞式

HDFS client同步提交操作给active namenode再由其同步给standby namenode,然后stand by nn接收到命令之后直接返回一个成功,这样确实不会影响用户体验性,但是如果处理过成功一些问题导致失败,那么就不能保证active namenode和stanyby namenode之间的数据一致性。

一个不能保证一致性,一个不能保证可用性,还有什么办法呢?还真的有,那就是引入一个中间层,由这个中间层来进行和nn之间的交互,这个中间层就是JournalNode Cluster,active nn向JournalNode中同步元数据信息,然后standby nn从JournalNode中同步过来元数据即可,这样就算standby nn出现问题也可以恢复后继续从JournalNode中拉取元数据,听起来和直接由active nn向stand by nn同步数据类似,那如果JournalNode出现问题应该怎么办呢?JournalNode是存在奇数个(3,5,7)的,只要active nn成功向 (JournalNode数-1)/2数量的JournalNode写入之后,JournalNode就会返回一个成功信息给active nn。

参考:
Paxos协议
分布式系统和去中心化区别

采取了Paxos协议的JournalNode Cluster保证了高可用性,如果某一个JournalNode挂掉,那么马上JournalNode集群会选举一个新的leader出来,这个机制在Hadoop中被称为QJM(HA using Quorum Journal Manager)架构,也是当前Hadoop默认的HA架构。

Hadoop HA QJM 架构

Active NameNode(ANN):在HDFS集群中,对外提供读写服务的唯一Master节点。ANN将客户端请求过来的写操作通过EditLog写入共享存储系统(即JournalNode Cluster),为Standby NameNode及时同步数据提供支持;

Standby NameNode(SBN):与ANN相互形成热备,SBN及时从共享存储系统中读取EditLog数据并更新内存,以保证当前状态尽可能与ANN同步。当前在整个HDFS集群中最多一台处于Active状态,最多一台处于Standby状态;

JournalNode Cluster(JNs):ANN与SBN之间共享Editlog的一致性存储系统,是HDFS NameNode高可用的核心组件。借助JournalNode集群ANN可以尽可能及时同步元数据到SBN。其中ANN采用Push模式将EditLog写入JN,SBN通过Pull模式从JN拉取数据,整个过程中JN不主动进行数据交换;

ZKFailoverController(ZKFC):ZKFailoverController以独立进程运行,对NameNode主备切换进行控制,正常情况ANN和SBN分别对应各自ZKFC进程。ZKFC主要功能:NameNode健康状况检测;借助Zookeeper实现NameNode自动选主;操作NameNode进行主从切换;

启动方式不同,2nn是一个单独的main方法去启动,而standbynn是一个随着namenode启动随之被调用的程序,但是其中的很多内部方法都非常类似,比如同样都是调用doCheckpoint()方法开始checkpoint,在之前也是要根据操作次数和checkpoint间隔时间判断是否进行checkpoint,需要注意的是,我们可以配置多个fsimage的目录

dfs.namenode.name.dir /data1/data/hadoop/namenode,/data2/data/hadoop/namenode

doCheckpoint 中 img.saveNamespace()->FSImage.saveFSImageInAllDirs() 最终会将所有fsimage保存在指定目录中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
final long txid;
final NameNodeFile imageType;
// Acquire cpLock to make sure no one is modifying the name system.
// It does not need the full namesystem write lock, since the only thing
// that modifies namesystem on standby node is edit log replaying.

// 这里获取cplock hdfs有两种锁 一种是fslock会将整个fsSystem锁住,另一种是专门给standby nn 进行ckeckpoint使用的cplock,这个锁不会影响块信息和元数据之间的映射关系,所以不用将整个文件系统锁住,应该只是锁住了active nn中的editlog文件,让他的txid不会再增长。
// Prevent reading of name system while being modified. The full
// name system lock will be acquired to further block even the block
// state updates.
namesystem.cpLockInterruptibly();
try {
assert namesystem.getEditLog().isOpenForRead() :
"Standby Checkpointer should only attempt a checkpoint when " +
"NN is in standby mode, but the edit logs are in an unexpected state";

FSImage img = namesystem.getFSImage();

// 获得上次进行checkpoint生成的fsimage中最大的txID
long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
// 获得正在写的editlog中最新的txID
long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();
assert thisCheckpointTxId >= prevCheckpointTxId;
if (thisCheckpointTxId == prevCheckpointTxId) {
LOG.info("A checkpoint was triggered but the Standby Node has not " +
"received any transactions since the last checkpoint at txid {}. " +
"Skipping...", thisCheckpointTxId);
return;
}

if (namesystem.isRollingUpgrade()
&& !namesystem.getFSImage().hasRollbackFSImage()) {
// 如果我们是要进行 rolling upgrade 滚动升级,则生成的fsimage叫做“fsimage_rollback”开头
// 就设置imageType为fsimage_rollback否则就设置为正常的"fsimage"开头
imageType = NameNodeFile.IMAGE_ROLLBACK;
} else {
imageType = NameNodeFile.IMAGE;
}
// Save the contents of the FS image to a new image file in each of the current storage directories.

// saveNamespace 这个方法将会在指定的目录生成image文件
img.saveNamespace(namesystem, imageType, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;

// Save the legacy OIV image, if the output dir is defined.
String outputDir = checkpointConf.getLegacyOivImageDir();
if (outputDir != null && !outputDir.isEmpty()) {
try {
img.saveLegacyOIVImage(namesystem, outputDir, canceler);
} catch (IOException ioe) {
LOG.warn("Exception encountered while saving legacy OIV image; "
+ "continuing with other checkpointing steps", ioe);
}
}
} finally {
namesystem.cpUnlock();
}
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
*/
public synchronized void saveNamespace(FSNamesystem source, NameNodeFile nnf,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
LOG.info("Save namespace ...");
storage.attemptRestoreRemovedStorage();

boolean editLogWasOpen = editLog.isSegmentOpen();

if (editLogWasOpen) {
editLog.endCurrentLogSegment(true);
}
// 获得最新写入的txID
long imageTxId = getCorrectLastAppliedOrWrittenTxId();
if (!addToCheckpointing(imageTxId)) {
throw new IOException(
"FS image is being downloaded from another NN at txid " + imageTxId);
}
try {
try {
// 这个方法会将文件全部合并为fsimage到指定目录中
saveFSImageInAllDirs(source, nnf, imageTxId, canceler);
if (!source.isRollingUpgrade()) {
updateStorageVersion();
}
} finally {
if (editLogWasOpen) {
editLog.startLogSegmentAndWriteHeaderTxn(imageTxId + 1,
source.getEffectiveLayoutVersion());
// Take this opportunity to note the current transaction.
// Even if the namespace save was cancelled, this marker
// is only used to determine what transaction ID is required
// for startup. So, it doesn't hurt to update it unnecessarily.
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
}
} finally {
removeFromCheckpointing(imageTxId);
}
//Update NameDirSize Metric
getStorage().updateNameDirSize();

if (exitAfterSave.get()) {
LOG.error("NameNode process will exit now... The saved FsImage " +
nnf + " is potentially corrupted.");
ExitUtil.terminate(-1);
}
}

/**
* @see #saveFSImageInAllDirs(FSNamesystem, NameNodeFile, long, Canceler)
*/
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
if (!addToCheckpointing(txid)) {
throw new IOException(("FS image is being downloaded from another NN"));
}
try {
saveFSImageInAllDirs(source, NameNodeFile.IMAGE, txid, null);
} finally {
removeFromCheckpointing(txid);
}
}
  • Post title:HDFS CheckPoint 流程源码分析
  • Post author:刘梦凯
  • Create time:2022-10-11 10:14:42
  • Post link:https://liumengkai.github.io/2022/10/11/HDFS-CheckPoint-流程源码分析/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.