Hadoop提交任务到Yarn流程源码学习分析
刘梦凯 Lv3

Talk is cheap show me the code!

Hadoop 向 yarn 提交任务整体流程

总体流程:

1)作业提交

第 1 步:Client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业。

第 2 步:Client 向 RM 申请一个作业 id。

第 3 步:RM 给 Client 返回该 job 资源的提交路径和作业 id。

第 4 步:Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。

第 5 步:Client 提交完资源后,向 RM 申请运行 MrAppMaster。

2)作业初始化

第 6 步:当 RM 收到 Client 的请求后,将该 job 添加到容量调度器中。

第 7 步:某一个空闲的 NM 领取到该 Job。

第 8 步:该 NM 创建 Container,并产生 MRAppmaster。

第 9 步:下载Client 提交的资源到本地。

3)任务分配

第 10 步:MrAppMaster 向 RM 申请运行多个 MapTask 任务资源。

第 11 步:RM 将运行 MapTask 任务分配给另外两个NodeManager,另两个 NodeManager分别领取任务并

创建容器。

4)任务运行

第 12 步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动

MapTask,MapTask 对数据分区排序。

第13 步:MrAppMaster 等待所有MapTask 运行完毕后,向RM 申请容器,运行ReduceTask。

第 14 步:ReduceTask 向 MapTask 获取相应分区的数据。

第 15 步:程序运行完毕后,MR 会向 RM 申请注销自己。

代码解析

代码中的整体流程:

下面涉及到的类/接口之间关系

Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() -> ApplicationClientProtocolPBClientImpl.submitApplication() -> ApplicationClientProtocolPBServiceImpl.submitApplication() -> ClientRMService.submitApplication() -> RMAppManager.submitApplication()

(1)

1
2
3
4
// 首先调用waitForCompletion启动任务
JobClient.java:

JobClient.waitForCompletion()

(2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// waitForCompletion方法中调用submit
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit();
...
}

//Submit the job to the cluster and return immediately.
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {

ensureState(JobState.DEFINE);
setUseNewAPI();
// 这里的connector会 确认 存在cluster对象,而cluster对象的主要作用是:Provides a way to access information about the map/reduce cluster.是我们访问map/reduce集群的主要途径
// 在Cluster.initialze()方法中根据providerList我们创建对应的cluster对象,mapreduce.framework.name参数来确定你的Job采用哪类应用,有Local和Yarn两种模式,Yarn任务对应的YarnClientProtocolProvider,各自的应用只需要实现相应的接口就可以把自己的Job运行在Yarn上,在YarnClientProtocolProvider.create()方法中创建了resMgrDelegate(ResourceMgrDelegate) 在创建resMgrDelegate的时候会创建其类变量client也就是YarnClientImpl,而我们最终提交任务就是通过YarnClientImpl.submitApplication()方法提交。
connect();

//创建submitter对象 然后由submitClient对象调用submitJobInternal
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {

public JobStatus run() throws IOException, InterruptedException,ClassNotFoundException
{
//submitJobInternal这个方法需要注意
return submitter.submitJobInternal(Job.this, cluster);
}

});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}

// submitJobInternal方法中:
// 设置jobid,client应该上传资源的路径,队列等信息
// copyAndConfigureFiles方法 将jar包上传到集群中
// writeSplits 将切片信息上传
// writeConf 将配置文件上传
// submitClient.submitJob 申请向resourcemanager中提交一个任务,这是一个接口方法,具体由实现类中重写的方法来实现,
// 我们向Yarn提交任务所以用的是YarnRunner中的submitJob 除此之外还有LocalJobRunner
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException,
InterruptedException, IOException {
...
//获取jobid
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
// 获取job资源应该提交的路径
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
...
copyAndConfigureFiles(job, submitJobDir);
...

...
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
...

// 提交任务 这里submitClient是ClientProtocol接口中的方法,ClientProtocol接口类实现有LocalJobRunner和YARNRunner,毫无疑问我们这里调用的是YARNRunner中的submitJob方法

status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Cluster.java中 初始化方法
// 这里初始化集群方法用到的ClientProtocolProvider接口中的实现类YarnClientProtocolProvider中的create方法
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {

initProviderList();
final IOException initEx = new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
if (jobTrackAddr != null) {
LOG.info(
"Initializing cluster for Job Tracker=" + jobTrackAddr.toString());
}
// 接口 ClientProtocolProvider ClientProtocol是客户端通信协议提供者 这里我们是和Yarn通信所以使用的是YarnClientProtocolProvider
for (ClientProtocolProvider provider : providerList) {
LOG.debug("Trying ClientProtocolProvider : "
+ provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
// 这里是YarnClientProtocolProvider中的create方法,注意create方法中创建了ResourceMgrDelegate,而ResourceMgrDelegate是抽象类YarnClient的实现类,也是我们与yarn RM的主要通信途径实现类。
// 其中ResourceMgrDelegate的类变量client是YarnClientImpl,后面提交任务还是会用到其中的submitApplication方法
// YARNRunner:This class enables the current JobClient (0.22 hadoop) to run on YARN.
/*
public ResourceMgrDelegate(YarnConfiguration conf) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
this.client = YarnClient.createYarnClient(); //这里的clinet是YarnClientImpl
init(conf);
start();
}
*/

/* YarnClientProtocolProvider.create()方法
public ClientProtocol create(Configuration conf) throws IOException {
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;*/

/*
YARNRunner.YARNRunner()构造方法
public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
ClientCache clientCache) {
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate; // resMgrDelegate 是RM通信主要途径
this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}
}*/
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}

if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName()
+ " as the ClientProtocolProvider");
break;
} else {
LOG.debug("Cannot pick " + provider.getClass().getName()
+ " as the ClientProtocolProvider - returned null protocol");
}
} catch (Exception e) {
final String errMsg = "Failed to use " + provider.getClass().getName()
+ " due to error: ";
initEx.addSuppressed(new IOException(errMsg, e));
LOG.info(errMsg, e);
}
}

if (null == clientProtocolProvider || null == client) {
throw initEx;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Client提交完资源之后需要向RM申请MRAppMaster 这里是YarnRunner中submitJob方法
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {

addHistoryToken(ts);

// createApplicationSubmissionContext 准备好所有创建一个MR AM所需的配置等信息,eg:创建applicationId,Setup ContainerLaunchContext for AM container等操作
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);

// Submit to ResourceManager
try {
//resMgrDelegate是ResourceMgrDelegate类对象 他负责和RM服务通信,继续调用他submitApplication方法,传递前面配置好的任务上下文信息,submitApplication这个方法底层调用的是client.submitApplication(appContext)方法,client是YarnClient抽象类,提交任务到Yarn会用到它的实现类YarnClientImpl.submitApplication()方法
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);

ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// YarnClientImpl.java:submitApplication():
// request.setApplicationSubmissionContext(appContext) 将任务上下文信息传入到SubmitApplicationRequest request()中
// rmClient.submitApplication(request) 提交任务
// rmClient是接口ApplicationClientProtocol,submitApplication是该接口中的方法,ApplicationClientProtocol协议负责的就是Client和ResourceManager的交互逻辑。主要功能是submit/abort jobs(提交/终止任务)和get information from applications(获取应用信息)以及get cluster metrics(获取集群指标)
@Override
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
...
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);

request.setApplicationSubmissionContext(appContext);
...
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);
...
return applicationId;
}

后续涉及到客户端和服务端和服务端通信,没有了解过,所以先到此为止。

参考资料:

https://cloud.tencent.com/developer/article/1889678

https://www.cnblogs.com/zsql/p/11276160.html

https://max.book118.com/html/2021/1111/7110106022004041.shtm

  • Post title:Hadoop提交任务到Yarn流程源码学习分析
  • Post author:刘梦凯
  • Create time:2022-10-02 23:36:36
  • Post link:https://liumengkai.github.io/2022/10/02/Hadoop提交任务到Yarn流程源码学习/
  • Copyright Notice:All articles in this blog are licensed under BY-NC-SA unless stating additionally.