首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Hadoop Yarn上兑现Hama BSP计算应用

2013-07-08 
Hadoop Yarn上实现Hama BSP计算应用/*** Creates a basic job with sequencefiles as in and output.*/pub

Hadoop Yarn上实现Hama BSP计算应用
/** * Creates a basic job with sequencefiles as in and output. */ public static BSPJob createJob(Configuration cnf, Path in, Path out, boolean textOut) throws IOException { HamaConfiguration conf = new HamaConfiguration(cnf); BSPJob job = new BSPJob(conf, KMeansBSP.class); //用于yarn框架下,请把上面一行注释掉,再把下一行取消注释 //BSPJob job = new YarnBSPJob(conf, KMeansBSP.class); //此处初始化一堆job参数, 省略... return job; }

?

2.2.?YARNBSPJob

要使BSP应用在Yarn上运行,需要有client向resourcemanager提交相应任务。YarnBSPJob就是做类似的工作。YarnBSPJob建立了与resourcemanager的代理applicationsManager,并且与之通信。

?

public class YARNBSPJob extends BSPJob {  private static final Log LOG = LogFactory.getLog(YARNBSPJob.class);  private static volatile int id = 0;  private YARNBSPJobClient submitClient;  // 通过submitClient.launchJob提交任务  private BSPClient client; //  private boolean submitted;  //看该job是否提交   private ApplicationReport report; // 应用的汇报信息  private ClientRMProtocol applicationsManager;  // 与resourcemanager通信的代理对象  private YarnRPC rpc;  // 创建一个与rm交互的客户端代理   public YARNBSPJob(HamaConfiguration conf) throws IOException {    super(conf);    submitClient = new YARNBSPJobClient(conf);    YarnConfiguration yarnConf = new YarnConfiguration(conf);    this.rpc = YarnRPC.create(conf);  //创建一个rm代理对象,准备与rm通信    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(        YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));    LOG.info("Connecting to ResourceManager at " + rmAddress);    this.applicationsManager = ((ClientRMProtocol) rpc.getProxy(        ClientRMProtocol.class, rmAddress, conf));  //与rm建立代理连接  }  //省略...}
??2.3. 提交应用launchJob

?在初始化RM代理之后,对该任务的相关class,xml等文件进行打jar包操作,然后把这jar包上传到hdfs上面去,再向RM提交该Job。提交BSP任务需要首先向RM请求新的application资源。

GetNewApplicationRequest request = Records        .newRecord(GetNewApplicationRequest.class);    GetNewApplicationResponse response = job.getApplicationsManager()        .getNewApplication(request);    id = response.getApplicationId();    LOG.debug("Got new ApplicationId=" + id);

?再先产生一个ContainerLaunchContext对象,填充一些环境参数(环境变量env map,job文件所在本地路径localResources,appmaster的container启动命令command,设置内存参数capability)之后,然后会构造ApplicationSubmissionContext对象(appid, appname, appmaster container上述配置)构造完再向RM提交应用。

// Create a new ApplicationSubmissionContext    ApplicationSubmissionContext appContext = Records        .newRecord(ApplicationSubmissionContext.class);    // set the ApplicationId    appContext.setApplicationId(this.id);    // set the application name    appContext.setApplicationName(job.getJobName());    // Create a new container launch context for the AM's container    ContainerLaunchContext amContainer = Records        .newRecord(ContainerLaunchContext.class);    // Define the local resources required    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();    // Lets assume the jar we need for our ApplicationMaster is available in    // HDFS at a certain known path to us and we want to make it available to    // the ApplicationMaster in the launched container    //初始化localResources, 省略...    // Set the local resources into the launch context    amContainer.setLocalResources(localResources);    // Set up the environment needed for the launch context    Map<String, String> env = new HashMap<String, String>();    // Assuming our classes or jars are available as local resources in the    // working directory from which the command will be run, we need to append    // "." to the path.    // By default, all the hadoop specific classpaths will already be available    // in $CLASSPATH, so we should be careful not to overwrite it.    String classPathEnv = "$CLASSPATH:./*:";    env.put("CLASSPATH", classPathEnv);    amContainer.setEnvironment(env);    // Construct the command to be executed on the launched container  // 产生能够启动appmaster的命令    String command = "${JAVA_HOME}"        + "/bin/java -cp "        + classPathEnv        + " "        + BSPApplicationMaster.class.getCanonicalName()        + " "        + submitJobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())            .toString() + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR        + "/stdout" + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR        + "/stderr";    LOG.debug("Start command: " + command);    amContainer.setCommands(Collections.singletonList(command));    Resource capability = Records.newRecord(Resource.class);    // we have at least 3 threads, which comsumes 1mb each, for each bsptask and    // a base usage of 100mb    capability.setMemory(3 * job.getNumBspTask()        + getConf().getInt("hama.appmaster.memory.mb", 100));    LOG.info("Set memory for the application master to "        + capability.getMemory() + "mb!");    amContainer.setResource(capability);    // Set the container launch content into the ApplicationSubmissionContext    appContext.setAMContainerSpec(amContainer);    // Create the request to send to the ApplicationsManager    SubmitApplicationRequest appRequest = Records        .newRecord(SubmitApplicationRequest.class); //封装提交请求    appRequest.setApplicationSubmissionContext(appContext);    job.getApplicationsManager().submitApplication(appRequest);//提交application应用请求

?当job向RM提交后,我们就可以向RM获得applicationMaster的运行状态。

GetApplicationReportRequest reportRequest = Records        .newRecord(GetApplicationReportRequest.class);    reportRequest.setApplicationId(id);    while (report == null || report.getHost().equals("N/A")) {      GetApplicationReportResponse reportResponse = job          .getApplicationsManager().getApplicationReport(reportRequest);      report = reportResponse.getApplicationReport();//获得appmaster的运行状态,包括:运行机器,端口等。      try {        Thread.sleep(1000L);      } catch (InterruptedException e) {        LOG.error(            "Got interrupted while waiting for a response report from AM.", e);      }    }
?2.4. Hadoop ResourceManager启动Job

客户端向ResourceManager提交请求后,RM最后通过ClientRMService直接调用RMapplication事件处理器来同步处理,而不是采用yarn提供的异步dispatcher来处理,这样的好处是客户端会立即知道提交的Job是不是初始化完成并启动成功。在这其中,ClientRMService实现了ClientRMProtocol服务,它会加载相关参数(yarn.resourcemanager.address)开启服务端口(默认端口:8040)来接收服务。

?

// 在submitApplication里面,直接调用RMAppManager.handle来同步处理请求rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System          .currentTimeMillis()));
// Create RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, clientTokenStr, appStore, this.scheduler, this.masterService, submitTime);//省略...// All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START));? // Register event handler for RmAppEvents this.rmDispatcher.register(RMAppEventType.class, new ApplicationEventDispatcher(this.rmContext)); // Register event handler for RmAppAttemptEvents this.rmDispatcher.register(RMAppAttemptEventType.class, new ApplicationAttemptEventDispatcher(this.rmContext));因此RMAppEvent事件会被ApplicationEventDispatcher来消化处理,ApplicationEventDispatcher会通过RMAppImpl.handle来修改该RMAppImpl的状态机为appmaster container节点创建RMAppAttemptImpl。同样,RMAppAttemptImp也维护着一个自有的状态机,根据状态机的当前状态和迁移状态来执行相应的逻辑。当RMAppAttemptImp进入allocated状态时,就会向某个NodeManager提交创建container。private void launch() throws IOException { connect(); //application.getMasterContainer()给出的是已经分配到某NM上运行的container, //它里面有相应NM的nodeId。 ContainerId masterContainerID = application.getMasterContainer().getId(); ApplicationSubmissionContext applicationContext = application.getSubmissionContext(); LOG.info("Setting up container " + application.getMasterContainer() + " for AM " + application.getAppAttemptId()); //准备好启动appmaster container时所需要的参数 ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID); StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class); request.setContainerLaunchContext(launchContext); containerMgrProxy.startContainer(request); //通过NM代理containerMgrProxy向某一个NM提交启动container的请求 LOG.info("Done launching container " + application.getMasterContainer() + " for AM " + application.getAppAttemptId()); }2.5 Hadoop NodeManager启动Job? //创建container状态机 Container container = new ContainerImpl(getConfig(), this.dispatcher, launchContext, credentials, metrics); ApplicationId applicationID = containerID.getApplicationAttemptId().getApplicationId(); if (context.getContainers().putIfAbsent(containerID, container) != null) { NMAuditLogger.logFailure(launchContext.getUser(), AuditConstants.START_CONTAINER, "ContainerManagerImpl", "Container already running on this node!", applicationID, containerID); throw RPCUtil.getRemoteException("Container " + containerID + " already is running on this node!!"); } //创建一个application Application application = new ApplicationImpl(dispatcher, this.aclsManager, launchContext.getUser(), applicationID, credentials, context); if (null == context.getApplications().putIfAbsent(applicationID, application)) { LOG.info("Creating a new application reference for app " + applicationID); dispatcher.getEventHandler().handle( new ApplicationInitEvent(applicationID, container .getLaunchContext().getApplicationACLs())); } // TODO: Validate the request dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container));

?最后通过org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher.handle(ContainersLauncherEvent)来创建一个新的container进程。

switch (event.getType()) { case LAUNCH_CONTAINER: Application app = context.getApplications().get( containerId.getApplicationAttemptId().getApplicationId()); //准备好启动一个新的Container环境变量和相关参数 ContainerLaunch launch = new ContainerLaunch(getConfig(), dispatcher, exec, app, event.getContainer(), dirsHandler); running.put(containerId, new RunningContainer(containerLauncher.submit(launch), launch)); //通过containerLauncher.submit(launch)创建一个新container进程,这个是一个阻塞进程,直到子进程退出。 break; case CLEANUP_CONTAINER: //省略...}?

?

?3. Hama AppMaster实现

?

?Hama的实现了一个appmaster是org.apache.hama.bsp.BSPApplicationMaster。它的构造函数如下:

?

private BSPApplicationMaster(String[] args) throws Exception {    // 传进来的参数是jobconf的路径名    if (args.length != 1) {      throw new IllegalArgumentException();    }    this.jobFile = args[0];    this.localConf = new YarnConfiguration();    //加入用户提交 的参数    this.jobConf = getSubmitConfiguration(jobFile);    //bsp的名称    this.applicationName = jobConf.get("bsp.job.name",        "<no bsp job name defined>");    if (applicationName.isEmpty()) {      this.applicationName = "<no bsp job name defined>";    }    //获得appAttemptId    this.appAttemptId = getApplicationAttemptId();    //产生一个yarnRPC,该RPC是用于yarn框架,在hama里主要用于产生一个RM代理用于与resourceManager进行通信。    this.yarnRPC = YarnRPC.create(localConf);    this.clock = new SystemClock();    this.startTime = clock.getTime();    this.jobId = new BSPJobID(appAttemptId.toString(), 0);    //appmaster开启一个端口,与客户端进行通信    this.hostname = BSPNetUtils.getCanonicalHostname();    this.clientPort = BSPNetUtils.getFreePort(12000);    // start our synchronization service 启动BSP同步服务,该服务会通过zookeeper server来管理服务的同步,    startSyncServer();    // 启动提供给客户端使用的BSPClient clientServer, 与其它bsp task container进行通信的BSPPeerProtocol taskServer,同时会设置hama.umbilical.address为hostname:taskServerPort    startRPCServers();    /*     * Make sure that this executes after the start the RPC servers, because we     * are readjusting the configuration.     */    //把上面重新改动的配置写到hdfs文件里面    rewriteSubmitConfiguration(jobFile, jobConf);    //把在客户端分好的split list从hdfs上读出来    String jobSplit = jobConf.get("bsp.job.split.file");    splits = null;    if (jobSplit != null) {      DataInputStream splitFile = fs.open(new Path(jobSplit));      try {        splits = BSPJobClient.readSplitFile(splitFile);      } finally {        splitFile.close();      }    }    //产生一个appmaster与resourceManager通信代理     this.amrmRPC = getYarnRPCConnection(localConf);    registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort,        "http://localhost:8080");  }
?

?

startRPCServers()方法创建了两个RPC server,这些server中的其中一个是用来与提交任务的clientServer,接口类为BSPClient,它有一个getCurrentSuperStep()方法用于获得该job已经运行到哪一超步了;另外一个taskServer用于与所属这个job的任务节点进行通信,接口类是BSPPeerProtocol,其里面有两个重要的方法:getTask(用于获得该节点的任务信息,用于task节点初始化)和statusUpdate(更新task节点的状态)。

?

当初始化appmaster后,会向RM申请集群的资源信息,RM计算好该分配的资源信息后,会返回一堆NM列表, appmaster会在这些节点列表上启动其相应的BSP Task。

?

@Override  public JobState startJob() throws Exception {    //初始化列表,产生的数量是bsp.peers.num个    this.allocatedContainers = new ArrayList<Container>(numBSPTasks);    while (allocatedContainers.size() < numBSPTasks) {      //向resourceManager申请bsp task container节点的资源。createBSPTaskRequest方法是请求资源ResourceRequest列表;releasedContainers是该job已经不再使用,需要释放的containerId列表。      AllocateRequest req = BuilderUtils.newAllocateRequest(          appAttemptId,          lastResponseID,          0.0f,          createBSPTaskRequest(numBSPTasks - allocatedContainers.size(),              taskMemoryInMb, priority), releasedContainers);      //向RM申请资源,启动numBSPTasks个bsp container任务      AllocateResponse allocateResponse = resourceManager.allocate(req);      AMResponse amResponse = allocateResponse.getAMResponse();      LOG.info("Got response! ID: " + amResponse.getResponseId()          + " with num of containers: "          + amResponse.getAllocatedContainers().size()          + " and following resources: "          + amResponse.getAvailableResources().getMemory() + "mb");      this.lastResponseID = amResponse.getResponseId();      // availableResources = amResponse.getAvailableResources();      this.allocatedContainers.addAll(amResponse.getAllocatedContainers());      LOG.info("Waiting to allocate "          + (numBSPTasks - allocatedContainers.size()) + " more containers...");      Thread.sleep(1000l);    }    //RM同时把numBSPTasks个container任务分配给了这个application。    LOG.info("Got " + allocatedContainers.size() + " containers!");    int id = 0;    for (Container allocatedContainer : allocatedContainers) {      LOG.info("Launching task on a new container." + ", containerId="          + allocatedContainer.getId() + ", containerNode="          + allocatedContainer.getNodeId().getHost() + ":"          + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="          + allocatedContainer.getNodeHttpAddress() + ", containerState"          + allocatedContainer.getState() + ", containerResourceMemory"          + allocatedContainer.getResource().getMemory());      // Connect to ContainerManager on the allocated container      //建立与container所属的NM进行通信。      String cmIpPortStr = allocatedContainer.getNodeId().getHost() + ":"          + allocatedContainer.getNodeId().getPort();      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);      //NM代理       ContainerManager cm = (ContainerManager) yarnRPC.getProxy(          ContainerManager.class, cmAddress, conf);      //构造一个BSPTaskLauncher用于启动container      BSPTaskLauncher runnableLaunchContainer = new BSPTaskLauncher(id,          allocatedContainer, cm, conf, jobFile, jobId);      launchers.put(id, runnableLaunchContainer);      //启动container。在这方法里调用setupContainer方法把JAVA命令行,环境变量,BSP相关参数都传给NM,调用NM的containerManager服务(org.apache.hadoop.yarn.api.ContainerManager.startContainer(StartContainerRequest))去启动一个子进程BSPRunner container,并且获得该container的状态信息。      runnableLaunchContainer.start();      completionQueue.add(runnableLaunchContainer);      id++;    }    LOG.info("Waiting for tasks to finish...");    state = JobState.RUNNING;    int completed = 0;    while (completed != numBSPTasks) {      for (BSPTaskLauncher task : completionQueue) {        BSPTaskStatus returnedTask = task.poll();  //从每个runnableLaunchContainer中获得状态信息。        // if our task returned with a finished state        if (returnedTask != null) {          if (returnedTask.getExitStatus() != 0) {            LOG.error("Task with id "" + returnedTask.getId() + "" failed!");            state = JobState.FAILED;            return state;          } else {            LOG.info("Task "" + returnedTask.getId()                + "" sucessfully finished!");            completed++;            LOG.info("Waiting for " + (numBSPTasks - completed)                + " tasks to finish!");          }          cleanupTask(returnedTask.getId());        }      }      Thread.sleep(1000L);    }    state = JobState.SUCCESS;    return state;  }
?

4. Container启动流程在Hama yarn框架的实现里,appmaster是一个特殊的container,它启动的方式跟普通的container一样,事实上其它的yarn实现也是通过这种流程。如下列出两类container进程的启动流程。
    普通container:Appmaster先向RM申请资源,然后appmaster向构造java命令等环境参数去通知NM去fork一个子进程setup任务节点。appmaster container:client端先构造java命令等环境参数,然后去向RM申请appmaster container资源,RM通知NM?fork一个子进程appmaster,启动这个appmaster的节点信自己会返回给RM和client。

?

?

?

?

?

?

?

?

热点排行