首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

[Hadoop源码解读](6)MapReduce篇之MapTask类

2012-09-18 
[Hadoop源码解读](六)MapReduce篇之MapTask类MapTask类继承于Task类,它最主要的方法就是run(),用来执行这

[Hadoop源码解读](六)MapReduce篇之MapTask类

MapTask类继承于Task类,它最主要的方法就是run(),用来执行这个Map任务。

  run()首先设置一个TaskReporter并启动,然后调用JobConf的getUseNewAPI()判断是否使用New API,使用New API的设置在前面[Hadoop源码解读](三)MapReduce篇之Job类 讲到过,再调用Task继承来的initialize()方法初始化这个task,接着根据需要执行runJobCleanupTask()、runJobSetupTask()、runTaskCleanupTask()或相应的Mapper,执行Mapper时根据情况使用不同版本的MapReduce,这个版本是设置参数决定的。


write()方法是由TaskInputOutputContext来的:

  private class NewDirectOutputCollector<K,V>  extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {    private final org.apache.hadoop.mapreduce.RecordWriter out;    private final TaskReporter reporter;    private final Counters.Counter mapOutputRecordCounter;    private final Counters.Counter fileOutputByteCounter;     private final Statistics fsStats;        @SuppressWarnings("unchecked")    NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,        JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)     throws IOException, ClassNotFoundException, InterruptedException {      this.reporter = reporter;      Statistics matchedStats = null;      if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) { //outputFormat是Task来的,内部类访问外部类成员变量        matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat            .getOutputPath(jobContext), job);      }      fsStats = matchedStats;      mapOutputRecordCounter =         reporter.getCounter(MAP_OUTPUT_RECORDS);      fileOutputByteCounter = reporter          .getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);      long bytesOutPrev = getOutputBytes(fsStats);      out = outputFormat.getRecordWriter(taskContext); //主要是这句,获取设置的OutputputFormat里的RecordWriter      long bytesOutCurr = getOutputBytes(fsStats);      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);    }    @Override    @SuppressWarnings("unchecked")    public void write(K key, V value)     throws IOException, InterruptedException {      reporter.progress();  //报告一下进度      long bytesOutPrev = getOutputBytes(fsStats);      out.write(key, value);//使用out收集一条记录,out是设置的OutputFormat来的。      long bytesOutCurr = getOutputBytes(fsStats);      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);  //更新输出字节数      mapOutputRecordCounter.increment(1);      //更新输出K/V对数量    }    @Override    public void close(TaskAttemptContext context)     throws IOException,InterruptedException {      reporter.progress();      if (out != null) {        long bytesOutPrev = getOutputBytes(fsStats);        out.close(context);        long bytesOutCurr = getOutputBytes(fsStats);        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);      }    }    private long getOutputBytes(Statistics stats) {      return stats == null ? 0 : stats.getBytesWritten();    }  }

另外还有一些以runOldMapper()为主导的旧MapReduce API那套,就不进行讨论了。


热点排行