spark源码分析--spark的任务调度(standalone模式)
原创,转载请注明出处 http://baishuo491.iteye.com/blog/1994026 ,作者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕
在sparkContext的建立过程中(更细致的说是clientActor的preStart回调函数中),会向master发送RegisterApplication消息
master ! RegisterApplication(appDescription) 当master收到RegisterApplication请求后: app = addApplication(description, sender) ..... waitingApps += app ..... schedule()通过传入的(appDescription)和发送者,创建一个addApplication,然后把app加入到等待队列waitingApps中,之后再调用schedule函数进行调度
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(canUse(app, _)).sortBy(_.coresFree).reverse从usableWorkers的空闲cpu中,选择合适数量的cpu,为当前app进行分配
launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome);app.state = ApplicationState.RUNNING
worker.addExecutor(exec) worker.actor ! LaunchExecutor( exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
val manager = new ExecutorRunner( appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
val command = buildCommandSeq() val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) <- appDesc.command.environment) { env.put(key, value) } ...... process = builder.start()
threadPool.execute(new TaskRunner(context, taskId, serializedTask))而TaskRunner就是这个任务真正被执行的地方