Hadoop Yarn上实现Hama BSP计算应用
/** * Creates a basic job with sequencefiles as in and output. */ public static BSPJob createJob(Configuration cnf, Path in, Path out, boolean textOut) throws IOException { HamaConfiguration conf = new HamaConfiguration(cnf); BSPJob job = new BSPJob(conf, KMeansBSP.class); //用于yarn框架下,请把上面一行注释掉,再把下一行取消注释 //BSPJob job = new YarnBSPJob(conf, KMeansBSP.class); //此处初始化一堆job参数, 省略... return job; }
?
2.2.?YARNBSPJob要使BSP应用在Yarn上运行,需要有client向resourcemanager提交相应任务。YarnBSPJob就是做类似的工作。YarnBSPJob建立了与resourcemanager的代理applicationsManager,并且与之通信。
?
public class YARNBSPJob extends BSPJob { private static final Log LOG = LogFactory.getLog(YARNBSPJob.class); private static volatile int id = 0; private YARNBSPJobClient submitClient; // 通过submitClient.launchJob提交任务 private BSPClient client; // private boolean submitted; //看该job是否提交 private ApplicationReport report; // 应用的汇报信息 private ClientRMProtocol applicationsManager; // 与resourcemanager通信的代理对象 private YarnRPC rpc; // 创建一个与rm交互的客户端代理 public YARNBSPJob(HamaConfiguration conf) throws IOException { super(conf); submitClient = new YARNBSPJobClient(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf); this.rpc = YarnRPC.create(conf); //创建一个rm代理对象,准备与rm通信 InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get( YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS)); LOG.info("Connecting to ResourceManager at " + rmAddress); this.applicationsManager = ((ClientRMProtocol) rpc.getProxy( ClientRMProtocol.class, rmAddress, conf)); //与rm建立代理连接 } //省略...}??2.3. 提交应用launchJob
?在初始化RM代理之后,对该任务的相关class,xml等文件进行打jar包操作,然后把这jar包上传到hdfs上面去,再向RM提交该Job。提交BSP任务需要首先向RM请求新的application资源。
GetNewApplicationRequest request = Records .newRecord(GetNewApplicationRequest.class); GetNewApplicationResponse response = job.getApplicationsManager() .getNewApplication(request); id = response.getApplicationId(); LOG.debug("Got new ApplicationId=" + id);
?再先产生一个ContainerLaunchContext对象,填充一些环境参数(环境变量env map,job文件所在本地路径localResources,appmaster的container启动命令command,设置内存参数capability)之后,然后会构造ApplicationSubmissionContext对象(appid, appname, appmaster container上述配置)构造完再向RM提交应用。
// Create a new ApplicationSubmissionContext ApplicationSubmissionContext appContext = Records .newRecord(ApplicationSubmissionContext.class); // set the ApplicationId appContext.setApplicationId(this.id); // set the application name appContext.setApplicationName(job.getJobName()); // Create a new container launch context for the AM's container ContainerLaunchContext amContainer = Records .newRecord(ContainerLaunchContext.class); // Define the local resources required Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); // Lets assume the jar we need for our ApplicationMaster is available in // HDFS at a certain known path to us and we want to make it available to // the ApplicationMaster in the launched container //初始化localResources, 省略... // Set the local resources into the launch context amContainer.setLocalResources(localResources); // Set up the environment needed for the launch context Map<String, String> env = new HashMap<String, String>(); // Assuming our classes or jars are available as local resources in the // working directory from which the command will be run, we need to append // "." to the path. // By default, all the hadoop specific classpaths will already be available // in $CLASSPATH, so we should be careful not to overwrite it. String classPathEnv = "$CLASSPATH:./*:"; env.put("CLASSPATH", classPathEnv); amContainer.setEnvironment(env); // Construct the command to be executed on the launched container // 产生能够启动appmaster的命令 String command = "${JAVA_HOME}" + "/bin/java -cp " + classPathEnv + " " + BSPApplicationMaster.class.getCanonicalName() + " " + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory()) .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; LOG.debug("Start command: " + command); amContainer.setCommands(Collections.singletonList(command)); Resource capability = Records.newRecord(Resource.class); // we have at least 3 threads, which comsumes 1mb each, for each bsptask and // a base usage of 100mb capability.setMemory(3 * job.getNumBspTask() + getConf().getInt("hama.appmaster.memory.mb", 100)); LOG.info("Set memory for the application master to " + capability.getMemory() + "mb!"); amContainer.setResource(capability); // Set the container launch content into the ApplicationSubmissionContext appContext.setAMContainerSpec(amContainer); // Create the request to send to the ApplicationsManager SubmitApplicationRequest appRequest = Records .newRecord(SubmitApplicationRequest.class); //封装提交请求 appRequest.setApplicationSubmissionContext(appContext); job.getApplicationsManager().submitApplication(appRequest);//提交application应用请求
?当job向RM提交后,我们就可以向RM获得applicationMaster的运行状态。
GetApplicationReportRequest reportRequest = Records .newRecord(GetApplicationReportRequest.class); reportRequest.setApplicationId(id); while (report == null || report.getHost().equals("N/A")) { GetApplicationReportResponse reportResponse = job .getApplicationsManager().getApplicationReport(reportRequest); report = reportResponse.getApplicationReport();//获得appmaster的运行状态,包括:运行机器,端口等。 try { Thread.sleep(1000L); } catch (InterruptedException e) { LOG.error( "Got interrupted while waiting for a response report from AM.", e); } }?2.4. Hadoop ResourceManager启动Job
客户端向ResourceManager提交请求后,RM最后通过ClientRMService直接调用RMapplication事件处理器来同步处理,而不是采用yarn提供的异步dispatcher来处理,这样的好处是客户端会立即知道提交的Job是不是初始化完成并启动成功。在这其中,ClientRMService实现了ClientRMProtocol服务,它会加载相关参数(yarn.resourcemanager.address)开启服务端口(默认端口:8040)来接收服务。
?
// 在submitApplication里面,直接调用RMAppManager.handle来同步处理请求rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis()));// Create RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, clientTokenStr, appStore, this.scheduler, this.masterService, submitTime);//省略...// All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START));? // Register event handler for RmAppEvents this.rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(this.rmContext)); // Register event handler for RmAppAttemptEvents this.rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(this.rmContext));因此RMAppEvent事件会被ApplicationEventDispatcher来消化处理,ApplicationEventDispatcher会通过RMAppImpl.handle来修改该RMAppImpl的状态机为appmaster container节点创建RMAppAttemptImpl。同样,RMAppAttemptImp也维护着一个自有的状态机,根据状态机的当前状态和迁移状态来执行相应的逻辑。当RMAppAttemptImp进入allocated状态时,就会向某个NodeManager提交创建container。private void launch() throws IOException { connect(); //application.getMasterContainer()给出的是已经分配到某NM上运行的container, //它里面有相应NM的nodeId。 ContainerId masterContainerID = application.getMasterContainer().getId(); ApplicationSubmissionContext applicationContext = application.getSubmissionContext(); LOG.info("Setting up container " + application.getMasterContainer() + " for AM " + application.getAppAttemptId()); //准备好启动appmaster container时所需要的参数 ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID); StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); request.setContainerLaunchContext(launchContext); containerMgrProxy.startContainer(request); //通过NM代理containerMgrProxy向某一个NM提交启动container的请求 LOG.info("Done launching container " + application.getMasterContainer() + " for AM " + application.getAppAttemptId()); }2.5 Hadoop NodeManager启动Job? //创建container状态机 Container container = new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics); ApplicationId applicationID = containerID.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerID, container) != null) { NMAuditLogger.logFailure(launchContext.getUser(), AuditConstants.START_CONTAINER, "ContainerManagerImpl", "Container already running on this node!", applicationID, containerID); throw RPCUtil.getRemoteException("Container " + containerID + " already is running on this node!!"); } //创建一个application Application application = new ApplicationImpl(dispatcher, this.aclsManager, launchContext.getUser(), applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); dispatcher.getEventHandler().handle( new ApplicationInitEvent(applicationID, container .getLaunchContext().getApplicationACLs())); } // TODO: Validate the request dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container));
?最后通过org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher.handle(ContainersLauncherEvent)来创建一个新的container进程。
switch (event.getType()) { case LAUNCH_CONTAINER: Application app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); //准备好启动一个新的Container环境变量和相关参数 ContainerLaunch launch = new ContainerLaunch(getConfig(), dispatcher, exec, app, event.getContainer(), dirsHandler); running.put(containerId, new RunningContainer(containerLauncher.submit(launch), launch)); //通过containerLauncher.submit(launch)创建一个新container进程,这个是一个阻塞进程,直到子进程退出。 break; case CLEANUP_CONTAINER: //省略...}??
?3. Hama AppMaster实现?
?Hama的实现了一个appmaster是org.apache.hama.bsp.BSPApplicationMaster。它的构造函数如下:
?
private BSPApplicationMaster(String[] args) throws Exception { // 传进来的参数是jobconf的路径名 if (args.length != 1) { throw new IllegalArgumentException(); } this.jobFile = args[0]; this.localConf = new YarnConfiguration(); //加入用户提交 的参数 this.jobConf = getSubmitConfiguration(jobFile); //bsp的名称 this.applicationName = jobConf.get("bsp.job.name", "<no bsp job name defined>"); if (applicationName.isEmpty()) { this.applicationName = "<no bsp job name defined>"; } //获得appAttemptId this.appAttemptId = getApplicationAttemptId(); //产生一个yarnRPC,该RPC是用于yarn框架,在hama里主要用于产生一个RM代理用于与resourceManager进行通信。 this.yarnRPC = YarnRPC.create(localConf); this.clock = new SystemClock(); this.startTime = clock.getTime(); this.jobId = new BSPJobID(appAttemptId.toString(), 0); //appmaster开启一个端口,与客户端进行通信 this.hostname = BSPNetUtils.getCanonicalHostname(); this.clientPort = BSPNetUtils.getFreePort(12000); // start our synchronization service 启动BSP同步服务,该服务会通过zookeeper server来管理服务的同步, startSyncServer(); // 启动提供给客户端使用的BSPClient clientServer, 与其它bsp task container进行通信的BSPPeerProtocol taskServer,同时会设置hama.umbilical.address为hostname:taskServerPort startRPCServers(); /* * Make sure that this executes after the start the RPC servers, because we * are readjusting the configuration. */ //把上面重新改动的配置写到hdfs文件里面 rewriteSubmitConfiguration(jobFile, jobConf); //把在客户端分好的split list从hdfs上读出来 String jobSplit = jobConf.get("bsp.job.split.file"); splits = null; if (jobSplit != null) { DataInputStream splitFile = fs.open(new Path(jobSplit)); try { splits = BSPJobClient.readSplitFile(splitFile); } finally { splitFile.close(); } } //产生一个appmaster与resourceManager通信代理 this.amrmRPC = getYarnRPCConnection(localConf); registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, "http://localhost:8080"); }?
?
startRPCServers()方法创建了两个RPC server,这些server中的其中一个是用来与提交任务的clientServer,接口类为BSPClient,它有一个getCurrentSuperStep()方法用于获得该job已经运行到哪一超步了;另外一个taskServer用于与所属这个job的任务节点进行通信,接口类是BSPPeerProtocol,其里面有两个重要的方法:getTask(用于获得该节点的任务信息,用于task节点初始化)和statusUpdate(更新task节点的状态)。
?
当初始化appmaster后,会向RM申请集群的资源信息,RM计算好该分配的资源信息后,会返回一堆NM列表, appmaster会在这些节点列表上启动其相应的BSP Task。
?
@Override public JobState startJob() throws Exception { //初始化列表,产生的数量是bsp.peers.num个 this.allocatedContainers = new ArrayList<Container>(numBSPTasks); while (allocatedContainers.size() < numBSPTasks) { //向resourceManager申请bsp task container节点的资源。createBSPTaskRequest方法是请求资源ResourceRequest列表;releasedContainers是该job已经不再使用,需要释放的containerId列表。 AllocateRequest req = BuilderUtils.newAllocateRequest( appAttemptId, lastResponseID, 0.0f, createBSPTaskRequest(numBSPTasks - allocatedContainers.size(), taskMemoryInMb, priority), releasedContainers); //向RM申请资源,启动numBSPTasks个bsp container任务 AllocateResponse allocateResponse = resourceManager.allocate(req); AMResponse amResponse = allocateResponse.getAMResponse(); LOG.info("Got response! ID: " + amResponse.getResponseId() + " with num of containers: " + amResponse.getAllocatedContainers().size() + " and following resources: " + amResponse.getAvailableResources().getMemory() + "mb"); this.lastResponseID = amResponse.getResponseId(); // availableResources = amResponse.getAvailableResources(); this.allocatedContainers.addAll(amResponse.getAllocatedContainers()); LOG.info("Waiting to allocate " + (numBSPTasks - allocatedContainers.size()) + " more containers..."); Thread.sleep(1000l); } //RM同时把numBSPTasks个container任务分配给了这个application。 LOG.info("Got " + allocatedContainers.size() + " containers!"); int id = 0; for (Container allocatedContainer : allocatedContainers) { LOG.info("Launching task on a new container." + ", containerId=" + allocatedContainer.getId() + ", containerNode=" + allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort() + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress() + ", containerState" + allocatedContainer.getState() + ", containerResourceMemory" + allocatedContainer.getResource().getMemory()); // Connect to ContainerManager on the allocated container //建立与container所属的NM进行通信。 String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":" + allocatedContainer.getNodeId().getPort(); InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr); //NM代理 ContainerManager cm = (ContainerManager) yarnRPC.getProxy( ContainerManager.class, cmAddress, conf); //构造一个BSPTaskLauncher用于启动container BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id, allocatedContainer, cm, conf, jobFile, jobId); launchers.put(id, runnableLaunchContainer); //启动container。在这方法里调用setupContainer方法把JAVA命令行,环境变量,BSP相关参数都传给NM,调用NM的containerManager服务(org.apache.hadoop.yarn.api.ContainerManager.startContainer(StartContainerRequest))去启动一个子进程BSPRunner container,并且获得该container的状态信息。 runnableLaunchContainer.start(); completionQueue.add(runnableLaunchContainer); id++; } LOG.info("Waiting for tasks to finish..."); state = JobState.RUNNING; int completed = 0; while (completed != numBSPTasks) { for (BSPTaskLauncher task : completionQueue) { BSPTaskStatus returnedTask = task.poll(); //从每个runnableLaunchContainer中获得状态信息。 // if our task returned with a finished state if (returnedTask != null) { if (returnedTask.getExitStatus() != 0) { LOG.error("Task with id "" + returnedTask.getId() + "" failed!"); state = JobState.FAILED; return state; } else { LOG.info("Task "" + returnedTask.getId() + "" sucessfully finished!"); completed++; LOG.info("Waiting for " + (numBSPTasks - completed) + " tasks to finish!"); } cleanupTask(returnedTask.getId()); } } Thread.sleep(1000L); } state = JobState.SUCCESS; return state; }?
?
?
?
?
?
?
?
?