TaskTracker LaunchTask过程与CleanTask过程
TaskTracker实现了Runnable接口,他的run函数的主要逻辑在offerService函数里,是一个循环,保持和JobTracker的心跳通信,如果TaskTracker当前有空余的Task槽位(Task的最大槽位数由TaskTracker上的配置文件中的mapred.tasktracker.map.tasks.maximum及mapred.tasktracker.reduce.tasks.maximum来决定的,在mapred-site.xml中配置),则在心跳消息里附带上Task任务申请信息(心跳消息的封装在 transmitHeartBeat()函数里),如果JobTracker收到此类心跳,且有未分配的Task,则会将适当的Task(这里在JobTracker端有一个Task调度的过程)通过心跳响应回给TaskTracker,
TaskTrackerAction[] actions = heartbeatResponse.getActions();
action有以下几类: LAUNCH_TASK,KILL_TASK,KILL_JOB, REINIT_TRACKER,COMMIT_TASK
actions是TaskTracker收到响应后根据对应的类型创建的,并不是JobTracker直接传过来的;
正常情况下,收到的第一个action都是LaunchTaskAction,根据任务类型,将该action放到对应的TaskLauncher中去,假设当前是一个Map Task,则mapLauncher.addToTaskQueue(action);这个函数里会生成一个TaskInProgresstip,将tip放置到任务启动队列:ListtasksToLaunch,并通知其他线程(实际是TaskLaucher这个线程)有任务需要处理,tasksToLaunch.notifyAll(),这将导致run函数里的等待返回;
TaskLaucher本身是一个线程,他的run函数不断从tasksToLaunch里获取任务进行处理,如果没有,则等待,流程如下:
startNewTask的主要逻辑位于localizeJob函数里,他所做的工作如下:
具体举例看一下这些路径的关系:
假设当前taskracker的目录:/home/xxx/hadoop/tmp/mapred/local/taskTracker
下面的/home/xxx/hadoop/tmp/mapred/local用~代替
则有jobcache目录 : ~/taskTracker/jobcache;
当前job路径: ~/taskTracker/jobcache/job_201202101537_0001 (job.xml下载到这里)
jar包路径 : ~/taskTracker/jobcache/job_201202101537_0001/jars (jar包下载并解压缩到这里)
work路径 :~/taskTracker/jobcache/job_201202101537_0001/work
当前task路径:
~/taskTracker/jobcache/job_201202101537_0001/attempt_201202101537_0001_m_000002_0
当前task的work路径 :
~/taskTracker/jobcache/job_201202101537_0001/attempt_201202101537_0001_m_000002_0/work
这里还生成与该task相关的job.xml,放到当前task路径下
TaskRunner继承Thread类,本身是一个线程类,在他的run函数里真正开始启动任务执行进程,主要执行步骤如下:
子JVM生成规则如下:
//Check whether there is a free slot to start a new JVM.
//,or, Kill a (idle) JVM and launch a new one
//When this method is called, we *must*
// (1) spawn a new JVM (if we are below the max)
未达到上限,新建
// (2) find an idle JVM (that belongs to the same job), or,
重用同job的空闲且运行的任务数还没达到最大JVM(一个子JVM可运行多个任务,最大任务数由jobtracker上mapred-site.xml里的mapred.job.reuse.JVM.num.tasks配置,注意:子JVM并不是并行运行多个任务的,他是串行的去taskTracker上取可运行的任务的)
// (3) kill an idle JVM (from a different job)
kill不同job的空闲JVM后新建(实际上相同job的JVM也会被kill)
// (the order of return is in the order above)
//Cases when a JVM is killed:
// (1) the JVM under consideration belongs to the same job
// (passed in the argument). In this case, kill only when
// the JVM ran all the tasks it was scheduled to run (in terms
// of count).
相同job,kill已执行完所有task的JVM
// (2) the JVM under consideration belongs to a different job and is
// currently not busy
不同job,kill空闲的JVM
//But in both the above cases, we see if we can assign the current
//task to an idle JVM (hence we continue the loop even on a match)
重用调用setRunningTaskForJVM(JVMRunner.JVMId, t)将重用的JVM id和task建立映射后返回。
除重用的情况,新建和kill掉新建需要执行spawnNewJVM(),会新建一个JVMRunner并启动。也会调setRunningTaskForJVM(),此外还会调JVMIdToRunner.put(JVMRunner.JVMId, JVMRunner)将JVM id和JVMRunner对象建立映射。
主要调用DefaultTaskController. launchTask(),会在finally中调用kill()和updateOnJVMExit(JVMId, exitCode)。
将子JVM的所有环境信息写入相应path下的sh脚本,通过runtime执行该脚本启动Child进程并等待获得返回值。
从脚本传入的参数生成相关task配置信息和工作目录,调用TaskTracker.getTask(),通过JVMid向JVMManager获取task具体信息(包含在JVMTask中),并将进程id在JVMManager中和JVMid建立映射。getTask()时如果JVMManager的JVMIdToRunner或TaskTracker的runningJobs或JVMManager.getTaskForJVM不包含此JVMid或此id相应对象,将返回包含空task信息的JVMTask。
TaskTrackerAction[] actions = heartbeatResponse.getActions();
……
tasksToCleanup.put(action);
获取到KILL_TASK类型的action,放入tasksToCleanup。
taskCleanupThread一直循环着处理KillJobAction和KillTaskAction.
purgeTask()中将指定task从相应job中移除,再调用tip.jobHasFinished
对特定状态的task进行kill。
对所有task进行cleanup操作。
对特定task的TaskRunner进行kill。
设置所有task的完成时间,从MemoryManager中移除task,释放slot。
根据JVMIdToPid查找相应JVMid的pid,找到,通过kill命令进行kill;否则无操作。
如果同一Task的launch和clean操作间隔很短,可能会出现TaskTracker对Child进程生命周期失去控制的情况。比如Child进程已启动,但还未执行到getTask()从而还没有向TaskTracker报告其进程id,而在此之前TaskTracker已收到kill消息,进行了kill操作。此时kill操作实际上什么也没做(因为没有pid来执行kill命令),而且还清除了jvmid等相关信息,完全放弃了对Child进程的控制。一旦Child因为某些原因未能正常结束而是阻塞,就会出现Child进程数量超过配置的情况。而且随着Child进程数量的增加,系统资源会越来越少最终不可用(jhdfs压力测试就出现了这种情况)。考虑可以通过脚本的形式定期对Child进程进行检验,kill掉阻塞的Child。
参考链接:
http://blog.csdn.net/zhangxinfa/article/details/7477501