首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

[Hadoop源码解读](2)MapReduce篇之Mapper类

2012-08-28 
[Hadoop源码解读](二)MapReduce篇之Mapper类前面在讲InputFormat的时候,讲到了Mapper类是如何利用RecordRe

[Hadoop源码解读](二)MapReduce篇之Mapper类

  前面在讲InputFormat的时候,讲到了Mapper类是如何利用RecordReader来读取InputSplit中的K-V对的。

  这一篇里,开始对Mapper.class的子类进行解读。

[Hadoop源码解读](2)MapReduce篇之Mapper类

  先回忆一下。Mapper有setup(),map(),cleanup()和run()四个方法。其中setup()一般是用来进行一些map()前的准备工作,map()则一般承担主要的处理工作,cleanup()则是收尾工作或者执行map()后的K-V分发。run()方法提供了setup->map->cleanup()的执行模板。

  在MapReduce中,Mapper从一个输入分片中读取数据,然后经过Shuffle and Sort阶段,分发数据给Reducer,在Map端和Reduce端我们可能使用设置的Combiner进行合并,这在Reduce前进行。Partitioner控制每个K-V对应该被分发到哪个reducer[我们的Job可能有多个reducer],Hadoop默认使用HashPartitioner,HashPartitioner使用key的hashCode对reducer的数量取模得来。

  我们先来看看三个Mapper的子类,它们位于src\mapred\org\apache\hadoop\mapreduce\lib\map中。

  1、TokenCounterMapper

 

  private class MapRunner extends Thread {    private Mapper<K1,V1,K2,V2> mapper;    private Context subcontext;    private Throwable throwable;    MapRunner(Context context) throws IOException, InterruptedException {      mapper = ReflectionUtils.newInstance(mapClass,                                            context.getConfiguration());      subcontext = new Context(outer.getConfiguration(),                             outer.getTaskAttemptID(),                            new SubMapRecordReader(),                            new SubMapRecordWriter(),                             context.getOutputCommitter(),                            new SubMapStatusReporter(),                            outer.getInputSplit());    }    public Throwable getThrowable() {      return throwable;    }    @Override    public void run() {      try {        mapper.run(subcontext);      } catch (Throwable ie) {        throwable = ie;      }    }  }
  在MapRunner的Constructor中我们看见,MapRunner所包含的subcontext中使用了独立的RecordReader、RecordWriter和StatusReporter,它们分别是SubMapRecordReader、SubMapRecordWriter和SubMapStatusReporter,我们就不分析了。值得注意的是,SubMapRecordReader在读K-V对和SubMapRecordWriter在写K-V对的时候都要同步。这是通过互斥访问MultithreadedMapper的上下文outer来实现的。

  MultithreadedMapper适用于CPU密集型的任务,采用多个线程处理后,一个线程可以在另外的线程在执行时读取数据并执行,这样就使用了更多的CPU周期来执行任务,从而提高吞吐率。注意读写操作都是线程安全的,因此不难想象对于IO密集型的作业,采用MultithreadedMapper会适得其反,因为会有多个线程等待IO,IO成为限制吞吐率的关键。对于IO密集型的任务,我们应该采用增多task数量的方法来解决,因为这样在IO上就是并行的。

  除非map()的确是CPU密集型的,否则不推荐使用MultithreadedMapper,而建议采用更多的map task。


热点排行