首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

[Hadoop源码解读](1)MapReduce篇之InputFormat

2012-09-10 
[Hadoop源码解读](一)MapReduce篇之InputFormat平时我们写MapReduce程序的时候,在设置输入式的时候,总会调

[Hadoop源码解读](一)MapReduce篇之InputFormat


  平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按照我们想要的格式被读取。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。

[Hadoop源码解读](1)MapReduce篇之InputFormat

  不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的map task作为数据源。下面我们先看看这些输入分片(inputSplit)是什么样的。

InputSplit:

  我们知道Mappers的输入是一个一个的输入分片,称InputSplit。InputSplit是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {   public static final String LINES_PER_MAP =     "mapreduce.input.lineinputformat.linespermap";  public RecordReader<LongWritable, Text> createRecordReader(      InputSplit genericSplit, TaskAttemptContext context)       throws IOException {    context.setStatus(genericSplit.toString());    return new LineRecordReader();  }  /**    * Logically splits the set of input files for the job, splits N lines   * of the input as one split.   *    * @see FileInputFormat#getSplits(JobContext)   */  public List<InputSplit> getSplits(JobContext job)  throws IOException {    List<InputSplit> splits = new ArrayList<InputSplit>();    int numLinesPerSplit = getNumLinesPerSplit(job);    for (FileStatus status : listStatus(job)) {      splits.addAll(getSplitsForFile(status,        job.getConfiguration(), numLinesPerSplit));    }    return splits;  }    public static List<FileSplit> getSplitsForFile(FileStatus status,      Configuration conf, int numLinesPerSplit) throws IOException {    List<FileSplit> splits = new ArrayList<FileSplit> ();    Path fileName = status.getPath();    if (status.isDir()) {      throw new IOException("Not a file: " + fileName);    }    FileSystem  fs = fileName.getFileSystem(conf);    LineReader lr = null;    try {      FSDataInputStream in  = fs.open(fileName);      lr = new LineReader(in, conf);      Text line = new Text();      int numLines = 0;      long begin = 0;      long length = 0;      int num = -1;      while ((num = lr.readLine(line)) > 0) {        numLines++;        length += num;        if (numLines == numLinesPerSplit) {          // NLineInputFormat uses LineRecordReader, which always reads          // (and consumes) at least one character out of its upper split          // boundary. So to make sure that each mapper gets N lines, we          // move back the upper split limits of each split           // by one character here.          if (begin == 0) {            splits.add(new FileSplit(fileName, begin, length - 1,              new String[] {}));          } else {            splits.add(new FileSplit(fileName, begin - 1, length,              new String[] {}));          }          begin += length;          length = 0;          numLines = 0;        }      }      if (numLines != 0) {        splits.add(new FileSplit(fileName, begin, length, new String[]{}));      }    } finally {      if (lr != null) {        lr.close();      }    }    return splits;   }    /**   * Set the number of lines per split   * @param job the job to modify   * @param numLines the number of lines per split   */  public static void setNumLinesPerSplit(Job job, int numLines) {    job.getConfiguration().setInt(LINES_PER_MAP, numLines);  }  /**   * Get the number of lines per split   * @param job the job   * @return the number of lines per split   */  public static int getNumLinesPerSplit(JobContext job) {    return job.getConfiguration().getInt(LINES_PER_MAP, 1);  }


  现在,我们对Hadoop的输入格式和其在MapReduce中如何被使用有了具体的了解了。

热点排行