Clsssic MapReduce (MapReduce 1) - Job assignment
Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker.
/** * Main service loop. Will stay in this loop forever. */ State offerService() throws Exception { long lastHeartbeat = System.currentTimeMillis(); while (running && !shuttingDown) { try { long now = System.currentTimeMillis(); // accelerate to account for multiple finished tasks up-front long remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; while (remaining > 0) { // sleeps for the wait time or // until there are *enough* empty slots to schedule tasks synchronized (finishedCount) { finishedCount.wait(remaining); // Recompute now = System.currentTimeMillis(); remaining = (lastHeartbeat + getHeartbeatInterval(finishedCount.get())) - now; if (remaining <= 0) { // Reset count finishedCount.set(0); break; } } } // If the TaskTracker is just starting up: // 1. Verify the versions matches with the JobTracker // 2. Get the system directory & filesystem if(justInited) { String jtBuildVersion = jobClient.getBuildVersion(); String jtVersion = jobClient.getVIVersion(); if (!isPermittedVersion(jtBuildVersion, jtVersion)) { String msg = "Shutting down. Incompatible buildVersion." + "\nJobTracker's: " + jtBuildVersion + "\nTaskTracker's: "+ VersionInfo.getBuildVersion() + " and " + CommonConfigurationKeys.HADOOP_RELAXED_VERSION_CHECK_KEY + " is " + (relaxedVersionCheck ? "enabled" : "not enabled"); LOG.fatal(msg); try { jobClient.reportTaskTrackerError(taskTrackerName, null, msg); } catch(Exception e ) { LOG.info("Problem reporting to jobtracker: " + e); } return State.DENIED; } String dir = jobClient.getSystemDir(); while (dir == null) { LOG.info("Failed to get system directory..."); // Re-try try { // Sleep interval: 1000 ms - 5000 ms int sleepInterval = 1000 + r.nextInt(4000); Thread.sleep(sleepInterval); } catch (InterruptedException ie) {} dir = jobClient.getSystemDir(); } systemDirectory = new Path(dir); systemFS = systemDirectory.getFileSystem(fConf); } now = System.currentTimeMillis(); if (now > (lastCheckDirsTime + diskHealthCheckInterval)) { localStorage.checkDirs(); lastCheckDirsTime = now; int numFailures = localStorage.numFailures(); // Re-init the task tracker if there were any new failures if (numFailures > lastNumFailures) { lastNumFailures = numFailures; return State.STALE; } } // Send the heartbeat and process the jobtracker's directives HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); // Note the time when the heartbeat returned, use this to decide when to send the // next heartbeat lastHeartbeat = System.currentTimeMillis(); // Check if the map-event list needs purging Set<JobID> jobs = heartbeatResponse.getRecoveredJobs(); if (jobs.size() > 0) { synchronized (this) { // purge the local map events list for (JobID job : jobs) { RunningJob rjob; synchronized (runningJobs) { rjob = runningJobs.get(job); if (rjob != null) { synchronized (rjob) { FetchStatus f = rjob.getFetchStatus(); if (f != null) { f.reset(); } } } } } // Mark the reducers in shuffle for rollback synchronized (shouldReset) { for (Map.Entry<TaskAttemptID, TaskInProgress> entry : runningTasks.entrySet()) { if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) { this.shouldReset.add(entry.getKey()); } } } } } TaskTrackerAction[] actions = heartbeatResponse.getActions(); if(LOG.isDebugEnabled()) { LOG.debug("Got heartbeatResponse from JobTracker with responseId: " + heartbeatResponse.getResponseId() + " and " + ((actions != null) ? actions.length : 0) + " actions"); } if (reinitTaskTracker(actions)) { return State.STALE; } // resetting heartbeat interval from the response. heartbeatInterval = heartbeatResponse.getHeartbeatInterval(); justStarted = false; justInited = false; if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { LOG.info("Received commit task action for " + commitAction.getTaskID()); commitResponses.add(commitAction.getTaskID()); } } else { addActionToCleanup(action); } } } markUnresponsiveTasks(); killOverflowingTasks(); //we've cleaned up, resume normal operation if (!acceptNewTasks && isIdle()) { acceptNewTasks=true; } //The check below may not be required every iteration but we are //erring on the side of caution here. We have seen many cases where //the call to jetty's getLocalPort() returns different values at //different times. Being a real paranoid here. checkJettyPort(server.getPort()); } catch (InterruptedException ie) { LOG.info("Interrupted. Closing down."); return State.INTERRUPTED; } catch (DiskErrorException de) { String msg = "Exiting task tracker for disk error:\n" + StringUtils.stringifyException(de); LOG.error(msg); synchronized (this) { jobClient.reportTaskTrackerError(taskTrackerName, "DiskErrorException", msg); } // If we caught a DEE here we have no good dirs, therefore shutdown. return State.DENIED; } catch (RemoteException re) { String reClass = re.getClassName(); if (DisallowedTaskTrackerException.class.getName().equals(reClass)) { LOG.info("Tasktracker disallowed by JobTracker."); return State.DENIED; } } catch (Exception except) { String msg = "Caught exception: " + StringUtils.stringifyException(except); LOG.error(msg); } } return State.NORMAL; }
?Heartbeats tell the jobtracker that a tasktracker is alive, but they also double as a channel for messages. As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task, and if it is, the jobtracker will allocate it a task, which it communicates to the tasktracker using the heartbeat return value.
/** * The periodic heartbeat mechanism between the {@link TaskTracker} and * the {@link JobTracker}. * * The {@link JobTracker} processes the status information sent by the * {@link TaskTracker} and responds with instructions to start/stop * tasks or jobs, and also 'reset' instructions during contingencies. */ public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Got heartbeat from: " + status.getTrackerName() + " (restarted: " + restarted + " initialContact: " + initialContact + " acceptNewTasks: " + acceptNewTasks + ")" + " with responseId: " + responseId); } // Make sure heartbeat is from a tasktracker allowed by the jobtracker. if (!acceptTaskTracker(status)) { throw new DisallowedTaskTrackerException(status); } // First check if the last heartbeat response got through String trackerName = status.getTrackerName(); long now = clock.getTime(); if (restarted) { faultyTrackers.markTrackerHealthy(status.getHost()); } else { faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now); } HeartbeatResponse prevHeartbeatResponse = trackerToHeartbeatResponseMap.get(trackerName); boolean addRestartInfo = false; if (initialContact != true) { // If this isn't the 'initial contact' from the tasktracker, // there is something seriously wrong if the JobTracker has // no record of the 'previous heartbeat'; if so, ask the // tasktracker to re-initialize itself. if (prevHeartbeatResponse == null) { // This is the first heartbeat from the old tracker to the newly // started JobTracker if (hasRestarted()) { addRestartInfo = true; // inform the recovery manager about this tracker joining back recoveryManager.unMarkTracker(trackerName); } else { // Jobtracker might have restarted but no recovery is needed // otherwise this code should not be reached LOG.warn("Serious problem, cannot find record of 'previous' " + "heartbeat for '" + trackerName + "'; reinitializing the tasktracker"); return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } } else { // It is completely safe to not process a 'duplicate' heartbeat from a // {@link TaskTracker} since it resends the heartbeat when rpcs are // lost see {@link TaskTracker.transmitHeartbeat()}; // acknowledge it by re-sending the previous response to let the // {@link TaskTracker} go forward. if (prevHeartbeatResponse.getResponseId() != responseId) { LOG.info("Ignoring 'duplicate' heartbeat from '" + trackerName + "'; resending the previous 'lost' response"); return prevHeartbeatResponse; } } } // Process this heartbeat short newResponseId = (short)(responseId + 1); status.setLastSeen(now); if (!processHeartbeat(status, initialContact, now)) { if (prevHeartbeatResponse != null) { trackerToHeartbeatResponseMap.remove(trackerName); } return new HeartbeatResponse(newResponseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); } // Initialize the response to be sent for the heartbeat HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost()); // Check for new tasks to be executed on the tasktracker if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) { TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); if(LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } actions.add(new LaunchTaskAction(task)); } } } } // Check for tasks to be killed List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { actions.addAll(killTasksList); } // Check for jobs to be killed/cleanedup List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); if (killJobsList != null) { actions.addAll(killJobsList); } // Check for tasks whose outputs can be saved List<TaskTrackerAction> commitTasksList = getTasksToSave(status); if (commitTasksList != null) { actions.addAll(commitTasksList); } // calculate next heartbeat interval and put in heartbeat response int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); response.setActions( actions.toArray(new TaskTrackerAction[actions.size()])); // check if the restart info is req if (addRestartInfo) { response.setRecoveredJobs(recoveryManager.getJobsToRecover()); } // Update the trackerToHeartbeatResponseMap trackerToHeartbeatResponseMap.put(trackerName, response); // Done processing the hearbeat, now remove 'marked' tasks removeMarkedTasks(trackerName); return response; }
?Below is the default inplementation of TaskScheduler in Hadoop of assignTasks(you can choose among several TaskSchedulers implementation such as fair scheduler). The default one simply maintains a priority list of jobs. The tasks to run is?communicates to the tasktracker using the heartbeat return value.
/** * Returns the tasks we'd like the TaskTracker to execute right now. * * @param taskTracker The TaskTracker for which we're looking for tasks. * @return A list of tasks to run on that TaskTracker, possibly empty. */ @Override public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { // Check for JT safe-mode if (taskTrackerManager.isInSafeMode()) { LOG.info("JobTracker is in safe-mode, not scheduling any tasks."); return null; } TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); final int numTaskTrackers = clusterStatus.getTaskTrackers(); final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); // Assigned tasks List<Task> assignedTasks = new ArrayList<Task>(); // // Compute (running + pending) map and reduce task numbers across pool // int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } } // Compute the 'load factor' for maps and reduces double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; } // // In the below steps, we allocate first map tasks (if appropriate), // and then reduce tasks if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We assign tasks to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // However, if the cluster is close to getting loaded i.e. we don't // have enough _padding_ for speculative executions etc., we only // schedule the "highest priority" task i.e. the task from the job // with the highest priority. // final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); // // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat // final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity); synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } } if (LOG.isDebugEnabled()) { LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + trackerCurrentReduceCapacity + "," + trackerRunningReduces + "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + ", " + (assignedTasks.size()-assignedMaps) + "]"); } return assignedTasks; }
?Tasktrackers have a fixed number of slots for map tasks and for reduce tasks, and these are set independently. In the context of a given job, the default scheduler fills empty map task slot, the jobtracker will select a map task; otherwise it will select a reduce task.
// Inside JobQueueTaskScheduler.java// // In the below steps, we allocate first map tasks (if appropriate), // and then reduce tasks if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We assign tasks to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // However, if the cluster is close to getting loaded i.e. we don't // have enough _padding_ for speculative executions etc., we only // schedule the "highest priority" task i.e. the task from the job // with the highest priority. // final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); // // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat // final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity); synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } }
?If we don't use the default scheduler, the precise number of slots depends on the number of cores and the amount of memory on the tasktracker.
?
By looking into the default scheduler's source code, we know that for a map task, it maybe data-local, rack-local or non-rack-local, for a reduce task it will simply takes the next in its list of yet-to-be-run reduce tasks. You can tell the proportion of each type of task by looking at a job's counters.