首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

Hadoop源码解读-JobTracker处置HeartBeat

2013-12-15 
Hadoop源码解读-JobTracker处理HeartBeatJobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码pub

Hadoop源码解读-JobTracker处理HeartBeat
JobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,                                                   boolean restarted,                                                  boolean initialContact,                                                  boolean acceptNewTasks,                                                   short responseId) 


1 首先检查heartbeat是否来自自己的host列表,否则抛出异常。
  如果不再Host列表或者在排除Host列表中,退出心跳处理。
 
  return (inHostsList(status) && !inExcludedHostsList(status)); 

2 判断是否在黑名单、灰名单、默认名单,并从这些名单中删除
  黑名单、灰名单主要是Hadoop的容错机制,在此不做过多解释,可以单写一篇文章。
 
  faultyTrackers.markTrackerHealthy(status.getHost());

3 根据trackerName获取上一个heartbeat response
 
   HeartbeatResponse prevHeartbeatResponse =      trackerToHeartbeatResponseMap.get(trackerName);

4 如果上一个heartbeat 为null,让Tasktracker重新初始化  如果是第一个 response 从recoveryMap中移除
   
 if (prevHeartbeatResponse == null) {        // This is the first heartbeat from the old tracker to the newly         // started JobTracker        if (hasRestarted()) {          addRestartInfo = true;          // inform the recovery manager about this tracker joining back          recoveryManager.unMarkTracker(trackerName);        } else {          // Jobtracker might have restarted but no recovery is needed          // otherwise this code should not be reached          LOG.warn("Serious problem, cannot find record of 'previous' " +                   "heartbeat for '" + trackerName +                    "'; reinitializing the tasktracker");          return new HeartbeatResponse(responseId,               new TaskTrackerAction[] {new ReinitTrackerAction()});        }      }

   如果重发的responseId,丢弃掉。
  
  if (prevHeartbeatResponse.getResponseId() != responseId) {          LOG.info("Ignoring 'duplicate' heartbeat from '" +               trackerName + "'; resending the previous 'lost' response");          return prevHeartbeatResponse;        }

5 处理heartbeat  首先 updateTaskTrackerStatus 如果是被遗忘的tasktracker 加入队列中;更新任务状态;更新健康节点状态;
  
    private synchronized boolean processHeartbeat(                                 TaskTrackerStatus trackerStatus,                                  boolean initialContact,                                 long timeStamp) throws UnknownHostException {   //主要集中在此不分析那么详细了}

6 检查新Task是否执行,如果没有执行,加入执行队列
 
    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);      if (taskTrackerStatus == null) {        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);      } else {        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);        if (tasks == null ) {          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));        }//添加Task        if (tasks != null) {          for (Task task : tasks) {            expireLaunchingTasks.addNewTask(task.getTaskID());            if(LOG.isDebugEnabled()) {              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());            }            actions.add(new LaunchTaskAction(task));          }        }      }    }

7 检查Task是否杀死
 List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);    if (killTasksList != null) {      actions.addAll(killTasksList);    }

8 检查 task 是否cleanup
 
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);    if (killJobsList != null) {      actions.addAll(killJobsList);    }

9 检查task 的output是否可以save
  
 List<TaskTrackerAction> commitTasksList = getTasksToSave(status);    if (commitTasksList != null) {      actions.addAll(commitTasksList);    }

10 计算下一次heartbeat的时间间隔
  
    int nextInterval = getNextHeartbeatInterval();    response.setHeartbeatInterval(nextInterval);    response.setActions(                        actions.toArray(new TaskTrackerAction[actions.size()]));

11 更新heartbeatMap,并remove掉Marked已经处理掉的heartbeat
  
// 更新Map  trackerToHeartbeatResponseMap.put(trackerName, response);    //清除处理完成的心跳    removeMarkedTasks(trackerName);


  不对之处欢迎讨论。
=================参考====
hadoop源码。

热点排行