首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

spark源码分析-spark的任务调度(standalone方式)

2013-12-26 
spark源码分析--spark的任务调度(standalone模式)原创,转载请注明出处http://baishuo491.iteye.com/blog/1

spark源码分析--spark的任务调度(standalone模式)
原创,转载请注明出处  http://baishuo491.iteye.com/blog/1994026 ,作者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕

在sparkContext的建立过程中(更细致的说是clientActor的preStart回调函数中),会向master发送RegisterApplication消息

master ! RegisterApplication(appDescription)  当master收到RegisterApplication请求后:  app = addApplication(description, sender)  .....  waitingApps += app  .....  schedule()
  通过传入的(appDescription)和发送者,创建一个addApplication,然后把app加入到等待队列waitingApps中,之后再调用schedule函数进行调度 
  再来看看schedule的流程:
    当等待队列waitingApps里有数据的时候,对waitingApps里的每个元素app做如下操作:
      从已经注册的works里面,选择合适的works列表usableWorkers
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(canUse(app, _)).sortBy(_.coresFree).reverse
从usableWorkers的空闲cpu中,选择合适数量的cpu,为当前app进行分配
      这之后调用launchExecutor,并把app的状态设置为running
launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome);app.state = ApplicationState.RUNNING
 
再来看看launchExecutor这个函数
先在每个传入的workerInfo参数里面,记录当前的app,和已经消耗的cpu和内存
      接着向worker发送LaunchExecutor消息,向client发送ExecutorAdded消息
    
 worker.addExecutor(exec)      worker.actor ! LaunchExecutor(      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)      exec.application.driver ! ExecutorAdded(      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
     
  我们再来看看worker收到LaunchExecutor后的执行步骤:
创建一个实际执行任务的ExecutorRunner并启动它
      更新已经使用的cpu和内存数量
      然后向master发送ExecutorStateChanged消息
 val manager = new ExecutorRunner(        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)      executors(appId + "/" + execId) = manager      manager.start()      coresUsed += cores_      memoryUsed += memory_      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
     
ExecutorRunner的start的函数,主要任务就是启动一个java后台线程,这个线程执行fetchAndRunExecutor函数,这个函数的主要流程如下:
      拼接出一个字符串,其内容是用JAVA_HOME\bin\java命令,执行一个类StandaloneExecutorBackend
      这个执行过程不在当前的jvm下执行。而是通过ProcessBuilder新建了一个jvm进程,单独执行的,相当于就是启动了一个StandaloneExecutorBackend实例
val command = buildCommandSeq()      val builder = new ProcessBuilder(command: _*).directory(executorDir)      val env = builder.environment()      for ((key, value) <- appDesc.command.environment) {        env.put(key, value)      }      ......      process = builder.start()

看看StandaloneExecutorBackend做了什么:
      创建了一个actor,在这个actor的在构造过程中,就是向客户端的driver发送RegisterExecutor(executorId, hostPort, cores)消息,然后开始等待消息     
上面发送的那个消息,会被driver端的StandaloneSchedulerBackend接受到
      它回复一个RegisteredExecutor(sparkProperties)消息,在修改了一系列的资源记录后,调用makeOffers()函数
      makeOffers()的作用是在拼装好要执行的任务列表tasks之后,把一个重要的变量hasLaunchedTask设置成true(借助scheduler.resourceOffers函数)
      这之后,向StandaloneExecutorBackend发送LaunchTask(task)消息
      这里可以看到RegisteredExecutor和LaunchTask消息都是发送给了StandaloneExecutorBackend,我们来看看它收到消息后的动作    
      在StandaloneExecutorBackend收到RegisteredExecutor(sparkProperties)消息后,会创建一个excutor变量,里面有创建了一个ThreadPoolExecutor线程池,名叫threadPool
      当它再收到LaunchTask(task)消息之后,就通过executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)函数,调用刚刚创建的threadPool线程池,执行传入的task任务
     
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
      而TaskRunner就是这个任务真正被执行的地方

热点排行