[Hadoop源码解读](六)MapReduce篇之MapTask类
MapTask类继承于Task类,它最主要的方法就是run(),用来执行这个Map任务。
run()首先设置一个TaskReporter并启动,然后调用JobConf的getUseNewAPI()判断是否使用New API,使用New API的设置在前面[Hadoop源码解读](三)MapReduce篇之Job类 讲到过,再调用Task继承来的initialize()方法初始化这个task,接着根据需要执行runJobCleanupTask()、runJobSetupTask()、runTaskCleanupTask()或相应的Mapper,执行Mapper时根据情况使用不同版本的MapReduce,这个版本是设置参数决定的。
write()方法是由TaskInputOutputContext来的:
private class NewDirectOutputCollector<K,V> extends org.apache.hadoop.mapreduce.RecordWriter<K,V> { private final org.apache.hadoop.mapreduce.RecordWriter out; private final TaskReporter reporter; private final Counters.Counter mapOutputRecordCounter; private final Counters.Counter fileOutputByteCounter; private final Statistics fsStats; @SuppressWarnings("unchecked") NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { this.reporter = reporter; Statistics matchedStats = null; if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { //outputFormat是Task来的,内部类访问外部类成员变量 matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat .getOutputPath(jobContext), job); } fsStats = matchedStats; mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN); long bytesOutPrev = getOutputBytes(fsStats); out = outputFormat.getRecordWriter(taskContext); //主要是这句,获取设置的OutputputFormat里的RecordWriter long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } @Override @SuppressWarnings("unchecked") public void write(K key, V value) throws IOException, InterruptedException { reporter.progress(); //报告一下进度 long bytesOutPrev = getOutputBytes(fsStats); out.write(key, value);//使用out收集一条记录,out是设置的OutputFormat来的。 long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); //更新输出字节数 mapOutputRecordCounter.increment(1); //更新输出K/V对数量 } @Override public void close(TaskAttemptContext context) throws IOException,InterruptedException { reporter.progress(); if (out != null) { long bytesOutPrev = getOutputBytes(fsStats); out.close(context); long bytesOutCurr = getOutputBytes(fsStats); fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } } private long getOutputBytes(Statistics stats) { return stats == null ? 0 : stats.getBytesWritten(); } }
另外还有一些以runOldMapper()为主导的旧MapReduce API那套,就不进行讨论了。