Talk is cheap show me the code!
Hadoop 向 yarn 提交任务整体流程 总体流程:
1)作业提交
第 1 步:Client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业。
第 2 步:Client 向 RM 申请一个作业 id。
第 3 步:RM 给 Client 返回该 job 资源的提交路径和作业 id。
第 4 步:Client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。
第 5 步:Client 提交完资源后,向 RM 申请运行 MrAppMaster。
2)作业初始化
第 6 步:当 RM 收到 Client 的请求后,将该 job 添加到容量调度器中。
第 7 步:某一个空闲的 NM 领取到该 Job。
第 8 步:该 NM 创建 Container,并产生 MRAppmaster。
第 9 步:下载Client 提交的资源到本地。
3)任务分配
第 10 步:MrAppMaster 向 RM 申请运行多个 MapTask 任务资源。
第 11 步:RM 将运行 MapTask 任务分配给另外两个NodeManager,另两个 NodeManager分别领取任务并
创建容器。
4)任务运行
第 12 步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动
MapTask,MapTask 对数据分区排序。
第13 步:MrAppMaster 等待所有MapTask 运行完毕后,向RM 申请容器,运行ReduceTask。
第 14 步:ReduceTask 向 MapTask 获取相应分区的数据。
第 15 步:程序运行完毕后,MR 会向 RM 申请注销自己。
代码解析 代码中的整体流程:
下面涉及到的类/接口之间关系
Job.waitForCompletion() -> Job.submit() -> Job.connect() -> Cluster.Cluster() -> Cluster.initialize() -> YarnClientProtocolProvider.create() -> JobSubmitter.sbumitJobInternal() -> YARNRunner.submitJob() -> ResourceMgrDelegate.submitApplication() -> YarnClientImpl.submitApplication() -> ApplicationClientProtocolPBClientImpl.submitApplication() -> ApplicationClientProtocolPBServiceImpl.submitApplication() -> ClientRMService.submitApplication() -> RMAppManager.submitApplication()
(1)
1 2 3 4 JobClient.java: JobClient.waitForCompletion()
(2)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public boolean waitForCompletion (boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); ... } public void submit () throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run () throws IOException, InterruptedException,ClassNotFoundException { return submitter.submitJobInternal(Job.this , cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); } JobStatus submitJobInternal (Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {... JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null ; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers" , "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer" ); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir" ); ... copyAndConfigureFiles(job, submitJobDir); ... ... LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir);conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); ... status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null ) { return status; } else { throw new IOException("Could not launch job" ); } } finally { if (status == null ) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null ) jtFs.delete(submitJobDir, true ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 private void initialize (InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { initProviderList(); final IOException initEx = new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses." ); if (jobTrackAddr != null ) { LOG.info( "Initializing cluster for Job Tracker=" + jobTrackAddr.toString()); } for (ClientProtocolProvider provider : providerList) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null ; try { if (jobTrackAddr == null ) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null ) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider" ); break ; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol" ); } } catch (Exception e) { final String errMsg = "Failed to use " + provider.getClass().getName() + " due to error: " ; initEx.addSuppressed(new IOException(errMsg, e)); LOG.info(errMsg, e); } } if (null == clientProtocolProvider || null == client) { throw initEx; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public JobStatus submitJob (JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); try { ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); ... }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Override public ApplicationId submitApplication (ApplicationSubmissionContext appContext) throws YarnException, IOException {... SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); ... rmClient.submitApplication(request); ... return applicationId; }
后续涉及到客户端和服务端和服务端通信,没有了解过,所以先到此为止。
参考资料:
https://cloud.tencent.com/developer/article/1889678
https://www.cnblogs.com/zsql/p/11276160.html
https://max.book118.com/html/2021/1111/7110106022004041.shtm