首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Clsssic MapReduce (MapReduce 一) - Job assignment

2013-07-04 
Clsssic MapReduce (MapReduce 1) - Job assignmentTasktrackers run a simple loop that periodically se

Clsssic MapReduce (MapReduce 1) - Job assignment

Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker.

  /**   * Main service loop.  Will stay in this loop forever.   */  State offerService() throws Exception {    long lastHeartbeat = System.currentTimeMillis();    while (running && !shuttingDown) {      try {        long now = System.currentTimeMillis();                // accelerate to account for multiple finished tasks up-front        long remaining =           (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;        while (remaining > 0) {          // sleeps for the wait time or           // until there are *enough* empty slots to schedule tasks          synchronized (finishedCount) {            finishedCount.wait(remaining);                        // Recompute            now = System.currentTimeMillis();            remaining =               (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now;                        if (remaining <= 0) {              // Reset count               finishedCount.set(0);              break;            }          }        }        // If the TaskTracker is just starting up:        // 1. Verify the versions matches with the JobTracker        // 2. Get the system directory & filesystem        if(justInited) {          String jtBuildVersion = jobClient.getBuildVersion();          String jtVersion = jobClient.getVIVersion();          if (!isPermittedVersion(jtBuildVersion, jtVersion)) {            String msg = "Shutting down. Incompatible buildVersion." +              "\nJobTracker's: " + jtBuildVersion +               "\nTaskTracker's: "+ VersionInfo.getBuildVersion() +              " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY +              " is " + (relaxedVersionCheck ? "enabled" : "not enabled");            LOG.fatal(msg);            try {              jobClient.reportTaskTrackerError(taskTrackerName, null, msg);            } catch(Exception e ) {              LOG.info("Problem reporting to jobtracker: " + e);            }            return State.DENIED;          }                    String dir = jobClient.getSystemDir();          while (dir == null) {            LOG.info("Failed to get system directory...");                        // Re-try            try {              // Sleep interval: 1000 ms - 5000 ms              int sleepInterval = 1000 + r.nextInt(4000);              Thread.sleep(sleepInterval);            } catch (InterruptedException ie)             {}            dir = jobClient.getSystemDir();          }          systemDirectory = new Path(dir);          systemFS = systemDirectory.getFileSystem(fConf);        }        now = System.currentTimeMillis();        if (now > (lastCheckDirsTime + diskHealthCheckInterval)) {          localStorage.checkDirs();          lastCheckDirsTime = now;          int numFailures = localStorage.numFailures();          // Re-init the task tracker if there were any new failures          if (numFailures > lastNumFailures) {            lastNumFailures = numFailures;            return State.STALE;          }        }        // Send the heartbeat and process the jobtracker's directives        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);        // Note the time when the heartbeat returned, use this to decide when to send the        // next heartbeat           lastHeartbeat = System.currentTimeMillis();                // Check if the map-event list needs purging        Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();        if (jobs.size() > 0) {          synchronized (this) {            // purge the local map events list            for (JobID job : jobs) {              RunningJob rjob;              synchronized (runningJobs) {                rjob = runningJobs.get(job);                          if (rjob != null) {                  synchronized (rjob) {                    FetchStatus f = rjob.getFetchStatus();                    if (f != null) {                      f.reset();                    }                  }                }              }            }            // Mark the reducers in shuffle for rollback            synchronized (shouldReset) {              for (Map.Entry<TaskAttemptID, TaskInProgress> entry                    : runningTasks.entrySet()) {                if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {                  this.shouldReset.add(entry.getKey());                }              }            }          }        }                TaskTrackerAction[] actions = heartbeatResponse.getActions();        if(LOG.isDebugEnabled()) {          LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +                     heartbeatResponse.getResponseId() + " and " +                     ((actions != null) ? actions.length : 0) + " actions");        }        if (reinitTaskTracker(actions)) {          return State.STALE;        }                    // resetting heartbeat interval from the response.        heartbeatInterval = heartbeatResponse.getHeartbeatInterval();        justStarted = false;        justInited = false;        if (actions != null){           for(TaskTrackerAction action: actions) {            if (action instanceof LaunchTaskAction) {              addToTaskQueue((LaunchTaskAction)action);            } else if (action instanceof CommitTaskAction) {              CommitTaskAction commitAction = (CommitTaskAction)action;              if (!commitResponses.contains(commitAction.getTaskID())) {                LOG.info("Received commit task action for " +                           commitAction.getTaskID());                commitResponses.add(commitAction.getTaskID());              }            } else {              addActionToCleanup(action);            }          }        }        markUnresponsiveTasks();        killOverflowingTasks();                    //we've cleaned up, resume normal operation        if (!acceptNewTasks && isIdle()) {          acceptNewTasks=true;        }        //The check below may not be required every iteration but we are         //erring on the side of caution here. We have seen many cases where        //the call to jetty's getLocalPort() returns different values at         //different times. Being a real paranoid here.        checkJettyPort(server.getPort());      } catch (InterruptedException ie) {        LOG.info("Interrupted. Closing down.");        return State.INTERRUPTED;      } catch (DiskErrorException de) {        String msg = "Exiting task tracker for disk error:\n" +          StringUtils.stringifyException(de);        LOG.error(msg);        synchronized (this) {          jobClient.reportTaskTrackerError(taskTrackerName,                                            "DiskErrorException", msg);        }        // If we caught a DEE here we have no good dirs, therefore shutdown.        return State.DENIED;      } catch (RemoteException re) {        String reClass = re.getClassName();        if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {          LOG.info("Tasktracker disallowed by JobTracker.");          return State.DENIED;        }      } catch (Exception except) {        String msg = "Caught exception: " +           StringUtils.stringifyException(except);        LOG.error(msg);      }    }    return State.NORMAL;  }

?Heartbeats tell the jobtracker that a tasktracker is alive, but they also double as a channel for messages. As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return value.

  /**   * The periodic heartbeat mechanism between the {@link TaskTracker} and   * the {@link JobTracker}.   *    * The {@link JobTracker} processes the status information sent by the    * {@link TaskTracker} and responds with instructions to start/stop    * tasks or jobs, and also 'reset' instructions during contingencies.    */  public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,                                                   boolean restarted,                                                  boolean initialContact,                                                  boolean acceptNewTasks,                                                   short responseId)     throws IOException {    if (LOG.isDebugEnabled()) {      LOG.debug("Got heartbeat from: " + status.getTrackerName() +                 " (restarted: " + restarted +                 " initialContact: " + initialContact +                 " acceptNewTasks: " + acceptNewTasks + ")" +                " with responseId: " + responseId);    }    // Make sure heartbeat is from a tasktracker allowed by the jobtracker.    if (!acceptTaskTracker(status)) {      throw new DisallowedTaskTrackerException(status);    }    // First check if the last heartbeat response got through    String trackerName = status.getTrackerName();    long now = clock.getTime();    if (restarted) {      faultyTrackers.markTrackerHealthy(status.getHost());    } else {      faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);    }        HeartbeatResponse prevHeartbeatResponse =      trackerToHeartbeatResponseMap.get(trackerName);    boolean addRestartInfo = false;    if (initialContact != true) {      // If this isn't the 'initial contact' from the tasktracker,      // there is something seriously wrong if the JobTracker has      // no record of the 'previous heartbeat'; if so, ask the       // tasktracker to re-initialize itself.      if (prevHeartbeatResponse == null) {        // This is the first heartbeat from the old tracker to the newly         // started JobTracker        if (hasRestarted()) {          addRestartInfo = true;          // inform the recovery manager about this tracker joining back          recoveryManager.unMarkTracker(trackerName);        } else {          // Jobtracker might have restarted but no recovery is needed          // otherwise this code should not be reached          LOG.warn("Serious problem, cannot find record of 'previous' " +                   "heartbeat for '" + trackerName +                    "'; reinitializing the tasktracker");          return new HeartbeatResponse(responseId,               new TaskTrackerAction[] {new ReinitTrackerAction()});        }      } else {                        // It is completely safe to not process a 'duplicate' heartbeat from a         // {@link TaskTracker} since it resends the heartbeat when rpcs are         // lost see {@link TaskTracker.transmitHeartbeat()};        // acknowledge it by re-sending the previous response to let the         // {@link TaskTracker} go forward.         if (prevHeartbeatResponse.getResponseId() != responseId) {          LOG.info("Ignoring 'duplicate' heartbeat from '" +               trackerName + "'; resending the previous 'lost' response");          return prevHeartbeatResponse;        }      }    }          // Process this heartbeat     short newResponseId = (short)(responseId + 1);    status.setLastSeen(now);    if (!processHeartbeat(status, initialContact, now)) {      if (prevHeartbeatResponse != null) {        trackerToHeartbeatResponseMap.remove(trackerName);      }      return new HeartbeatResponse(newResponseId,                    new TaskTrackerAction[] {new ReinitTrackerAction()});    }          // Initialize the response to be sent for the heartbeat    HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);    List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();    boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());    // Check for new tasks to be executed on the tasktracker    if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);      if (taskTrackerStatus == null) {        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);      } else {        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);        if (tasks == null ) {          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));        }        if (tasks != null) {          for (Task task : tasks) {            expireLaunchingTasks.addNewTask(task.getTaskID());            if(LOG.isDebugEnabled()) {              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());            }            actions.add(new LaunchTaskAction(task));          }        }      }    }          // Check for tasks to be killed    List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);    if (killTasksList != null) {      actions.addAll(killTasksList);    }         // Check for jobs to be killed/cleanedup    List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);    if (killJobsList != null) {      actions.addAll(killJobsList);    }    // Check for tasks whose outputs can be saved    List<TaskTrackerAction> commitTasksList = getTasksToSave(status);    if (commitTasksList != null) {      actions.addAll(commitTasksList);    }    // calculate next heartbeat interval and put in heartbeat response    int nextInterval = getNextHeartbeatInterval();    response.setHeartbeatInterval(nextInterval);    response.setActions(                        actions.toArray(new TaskTrackerAction[actions.size()]));        // check if the restart info is req    if (addRestartInfo) {      response.setRecoveredJobs(recoveryManager.getJobsToRecover());    }            // Update the trackerToHeartbeatResponseMap    trackerToHeartbeatResponseMap.put(trackerName, response);    // Done processing the hearbeat, now remove 'marked' tasks    removeMarkedTasks(trackerName);            return response;  }  

?Below is the default inplementation of TaskScheduler in Hadoop of assignTasks(you can choose among several TaskSchedulers implementation such as fair scheduler). The default one simply maintains a priority list of jobs. The tasks to run is?communicates to the tasktracker using the heartbeat return value.

  /**   * Returns the tasks we'd like the TaskTracker to execute right now.   *    * @param taskTracker The TaskTracker for which we're looking for tasks.   * @return A list of tasks to run on that TaskTracker, possibly empty.   */  @Override  public synchronized List<Task> assignTasks(TaskTracker taskTracker)      throws IOException {    // Check for JT safe-mode    if (taskTrackerManager.isInSafeMode()) {      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");      return null;    }     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();    final int numTaskTrackers = clusterStatus.getTaskTrackers();    final int clusterMapCapacity = clusterStatus.getMaxMapTasks();    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();    Collection<JobInProgress> jobQueue =      jobQueueJobInProgressListener.getJobQueue();    //    // Get map + reduce counts for the current tracker.    //    final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();    final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();    final int trackerRunningMaps = taskTrackerStatus.countMapTasks();    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();    // Assigned tasks    List<Task> assignedTasks = new ArrayList<Task>();    //    // Compute (running + pending) map and reduce task numbers across pool    //    int remainingReduceLoad = 0;    int remainingMapLoad = 0;    synchronized (jobQueue) {      for (JobInProgress job : jobQueue) {        if (job.getStatus().getRunState() == JobStatus.RUNNING) {          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());          if (job.scheduleReduces()) {            remainingReduceLoad +=               (job.desiredReduces() - job.finishedReduces());          }        }      }    }    // Compute the 'load factor' for maps and reduces    double mapLoadFactor = 0.0;    if (clusterMapCapacity > 0) {      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;    }    double reduceLoadFactor = 0.0;    if (clusterReduceCapacity > 0) {      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;    }            //    // In the below steps, we allocate first map tasks (if appropriate),    // and then reduce tasks if appropriate.  We go through all jobs    // in order of job arrival; jobs only get serviced if their     // predecessors are serviced, too.    //    //    // We assign tasks to the current taskTracker if the given machine     // has a workload that's less than the maximum load of that kind of    // task.    // However, if the cluster is close to getting loaded i.e. we don't    // have enough _padding_ for speculative executions etc., we only     // schedule the "highest priority" task i.e. the task from the job     // with the highest priority.    //        final int trackerCurrentMapCapacity =       Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),                               trackerMapCapacity);    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;    boolean exceededMapPadding = false;    if (availableMapSlots > 0) {      exceededMapPadding =         exceededPadding(true, clusterStatus, trackerMapCapacity);    }        int numLocalMaps = 0;    int numNonLocalMaps = 0;    scheduleMaps:    for (int i=0; i < availableMapSlots; ++i) {      synchronized (jobQueue) {        for (JobInProgress job : jobQueue) {          if (job.getStatus().getRunState() != JobStatus.RUNNING) {            continue;          }          Task t = null;                    // Try to schedule a node-local or rack-local Map task          t =             job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,                 numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());          if (t != null) {            assignedTasks.add(t);            ++numLocalMaps;                        // Don't assign map tasks to the hilt!            // Leave some free slots in the cluster for future task-failures,            // speculative tasks etc. beyond the highest priority job            if (exceededMapPadding) {              break scheduleMaps;            }                       // Try all jobs again for the next Map task             break;          }                    // Try to schedule a node-local or rack-local Map task          t =             job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,                                   taskTrackerManager.getNumberOfUniqueHosts());                    if (t != null) {            assignedTasks.add(t);            ++numNonLocalMaps;                        // We assign at most 1 off-switch or speculative task            // This is to prevent TaskTrackers from stealing local-tasks            // from other TaskTrackers.            break scheduleMaps;          }        }      }    }    int assignedMaps = assignedTasks.size();    //    // Same thing, but for reduce tasks    // However we _never_ assign more than 1 reduce task per heartbeat    //    final int trackerCurrentReduceCapacity =       Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),                trackerReduceCapacity);    final int availableReduceSlots =       Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);    boolean exceededReducePadding = false;    if (availableReduceSlots > 0) {      exceededReducePadding = exceededPadding(false, clusterStatus,                                               trackerReduceCapacity);      synchronized (jobQueue) {        for (JobInProgress job : jobQueue) {          if (job.getStatus().getRunState() != JobStatus.RUNNING ||              job.numReduceTasks == 0) {            continue;          }          Task t =             job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,                                     taskTrackerManager.getNumberOfUniqueHosts()                                    );          if (t != null) {            assignedTasks.add(t);            break;          }                    // Don't assign reduce tasks to the hilt!          // Leave some free slots in the cluster for future task-failures,          // speculative tasks etc. beyond the highest priority job          if (exceededReducePadding) {            break;          }        }      }    }        if (LOG.isDebugEnabled()) {      LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +                "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +                 trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +                 (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +                assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +                 ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +                 trackerCurrentReduceCapacity + "," + trackerRunningReduces +                 "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +                 ", " + (assignedTasks.size()-assignedMaps) + "]");    }    return assignedTasks;  }

?Tasktrackers have a fixed number of slots for map tasks and for reduce tasks, and these are set independently. In the context of a given job, the default scheduler fills empty map task slot, the jobtracker will select a map task; otherwise it will select a reduce task.

// Inside JobQueueTaskScheduler.java//    // In the below steps, we allocate first map tasks (if appropriate),    // and then reduce tasks if appropriate.  We go through all jobs    // in order of job arrival; jobs only get serviced if their     // predecessors are serviced, too.    //    //    // We assign tasks to the current taskTracker if the given machine     // has a workload that's less than the maximum load of that kind of    // task.    // However, if the cluster is close to getting loaded i.e. we don't    // have enough _padding_ for speculative executions etc., we only     // schedule the "highest priority" task i.e. the task from the job     // with the highest priority.    //        final int trackerCurrentMapCapacity =       Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),                               trackerMapCapacity);    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;    boolean exceededMapPadding = false;    if (availableMapSlots > 0) {      exceededMapPadding =         exceededPadding(true, clusterStatus, trackerMapCapacity);    }        int numLocalMaps = 0;    int numNonLocalMaps = 0;    scheduleMaps:    for (int i=0; i < availableMapSlots; ++i) {      synchronized (jobQueue) {        for (JobInProgress job : jobQueue) {          if (job.getStatus().getRunState() != JobStatus.RUNNING) {            continue;          }          Task t = null;                    // Try to schedule a node-local or rack-local Map task          t =             job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus,                 numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());          if (t != null) {            assignedTasks.add(t);            ++numLocalMaps;                        // Don't assign map tasks to the hilt!            // Leave some free slots in the cluster for future task-failures,            // speculative tasks etc. beyond the highest priority job            if (exceededMapPadding) {              break scheduleMaps;            }                       // Try all jobs again for the next Map task             break;          }                    // Try to schedule a node-local or rack-local Map task          t =             job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,                                   taskTrackerManager.getNumberOfUniqueHosts());                    if (t != null) {            assignedTasks.add(t);            ++numNonLocalMaps;                        // We assign at most 1 off-switch or speculative task            // This is to prevent TaskTrackers from stealing local-tasks            // from other TaskTrackers.            break scheduleMaps;          }        }      }    }    int assignedMaps = assignedTasks.size();    //    // Same thing, but for reduce tasks    // However we _never_ assign more than 1 reduce task per heartbeat    //    final int trackerCurrentReduceCapacity =       Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),                trackerReduceCapacity);    final int availableReduceSlots =       Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);    boolean exceededReducePadding = false;    if (availableReduceSlots > 0) {      exceededReducePadding = exceededPadding(false, clusterStatus,                                               trackerReduceCapacity);      synchronized (jobQueue) {        for (JobInProgress job : jobQueue) {          if (job.getStatus().getRunState() != JobStatus.RUNNING ||              job.numReduceTasks == 0) {            continue;          }          Task t =             job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,                                     taskTrackerManager.getNumberOfUniqueHosts()                                    );          if (t != null) {            assignedTasks.add(t);            break;          }                    // Don't assign reduce tasks to the hilt!          // Leave some free slots in the cluster for future task-failures,          // speculative tasks etc. beyond the highest priority job          if (exceededReducePadding) {            break;          }        }      }    }

?If we don't use the default scheduler, the precise number of slots depends on the number of cores and the amount of memory on the tasktracker.

?

By looking into the default scheduler's source code, we know that for a map task, it maybe data-local, rack-local or non-rack-local, for a reduce task it will simply takes the next in its list of yet-to-be-run reduce tasks. You can tell the proportion of each type of task by looking at a job's counters.

热点排行