  不同的InputFormat都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的map task作为数据源。下面我们先看看这些输入分片(inputSplit)是什么样的。



public class NLineInputFormat extends FileInputFormat<LongWritable, Text> {   public static final String LINES_PER_MAP =     "mapreduce.input.lineinputformat.linespermap";  public RecordReader<LongWritable, Text> createRecordReader(      InputSplit genericSplit, TaskAttemptContext context)       throws IOException {    context.setStatus(genericSplit.toString());    return new LineRecordReader();  }  /**    * Logically splits the set of input files for the job, splits N lines   * of the input as one split.   *    * @see FileInputFormat#getSplits(JobContext)   */  public List<InputSplit> getSplits(JobContext job)  throws IOException {    List<InputSplit> splits = new ArrayList<InputSplit>();    int numLinesPerSplit = getNumLinesPerSplit(job);    for (FileStatus status : listStatus(job)) {      splits.addAll(getSplitsForFile(status,        job.getConfiguration(), numLinesPerSplit));    }    return splits;  }    public static List<FileSplit> getSplitsForFile(FileStatus status,      Configuration conf, int numLinesPerSplit) throws IOException {    List<FileSplit> splits = new ArrayList<FileSplit> ();    Path fileName = status.getPath();    if (status.isDir()) {      throw new IOException("Not a file: " + fileName);    }    FileSystem  fs = fileName.getFileSystem(conf);    LineReader lr = null;    try {      FSDataInputStream in  = fs.open(fileName);      lr = new LineReader(in, conf);      Text line = new Text();      int numLines = 0;      long begin = 0;      long length = 0;      int num = -1;      while ((num = lr.readLine(line)) > 0) {        numLines++;        length += num;        if (numLines == numLinesPerSplit) {          // NLineInputFormat uses LineRecordReader, which always reads          // (and consumes) at least one character out of its upper split          // boundary. So to make sure that each mapper gets N lines, we          // move back the upper split limits of each split           // by one character here.          if (begin == 0) {            splits.add(new FileSplit(fileName, begin, length - 1,              new String[] {}));          } else {            splits.add(new FileSplit(fileName, begin - 1, length,              new String[] {}));          }          begin += length;          length = 0;          numLines = 0;        }      }      if (numLines != 0) {        splits.add(new FileSplit(fileName, begin, length, new String[]{}));      }    } finally {      if (lr != null) {        lr.close();      }    }    return splits;   }    /**   * Set the number of lines per split   * @param job the job to modify   * @param numLines the number of lines per split   */  public static void setNumLinesPerSplit(Job job, int numLines) {    job.getConfiguration().setInt(LINES_PER_MAP, numLines);  }  /**   * Get the number of lines per split   * @param job the job   * @return the number of lines per split   */  public static int getNumLinesPerSplit(JobContext job) {    return job.getConfiguration().getInt(LINES_PER_MAP, 1);  }

