首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

SleepJob源代码诠释

2012-10-30 
SleepJob源代码注释package org.apache.hadoop.examplesimport java.io.IOExceptionimport java.io.Data

SleepJob源代码注释

package org.apache.hadoop.examples;import java.io.IOException;import java.io.DataInput;import java.io.DataOutput;import java.util.Iterator;import java.util.Random;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.SequenceFile;import org.apache.hadoop.mapred.*;import org.apache.hadoop.mapred.lib.NullOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/**hadoop的 map/reduce例子程序,在mapper 和reducer里休眠指定的时间。 * Dummy class for testing MR framefork. Sleeps for a defined period  * of time in mapper and reducer. Generates fake input for map / reduce  * jobs. Note that generated number of input pairs is in the order  * of <code>numMappers * mapSleepTime / 100</code>, so the job uses * some disk space. */public class SleepJob extends Configured implements Tool,               Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,             Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,             Partitioner<IntWritable,NullWritable> {  private long mapSleepDuration = 100; //map休眠的时间(毫秒)  private long reduceSleepDuration = 100;//reducer休眠的时间(毫秒)  private int mapSleepCount = 1; //map的数目  private int reduceSleepCount = 1;//reduce的数目  private int count = 0;  public int getPartition(IntWritable k, NullWritable v, int numPartitions) {    return k.get() % numPartitions;  }   /*自定义空的split*/  public static class EmptySplit implements InputSplit {    public void write(DataOutput out) throws IOException { }    public void readFields(DataInput in) throws IOException { }    public long getLength() { return 0L; }    public String[] getLocations() { return new String[0]; }  }/*自定义inputformat*/  public static class SleepInputFormat extends Configured      implements InputFormat<IntWritable,IntWritable> {    public InputSplit[] getSplits(JobConf conf, int numSplits) {      InputSplit[] ret = new InputSplit[numSplits];      for (int i = 0; i < numSplits; ++i) {        ret[i] = new EmptySplit();      }      return ret;    }    public RecordReader<IntWritable,IntWritable> getRecordReader(        InputSplit ignored, JobConf conf, Reporter reporter)        throws IOException {      final int count = conf.getInt("sleep.job.map.sleep.count", 1);      if (count < 0) throw new IOException("Invalid map count: " + count);      final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);      if (redcount < 0)        throw new IOException("Invalid reduce count: " + redcount);      final int emitPerMapTask = (redcount * conf.getNumReduceTasks());    return new RecordReader<IntWritable,IntWritable>() {        private int records = 0;        private int emitCount = 0;        public boolean next(IntWritable key, IntWritable value)            throws IOException {          key.set(emitCount);          int emit = emitPerMapTask / count;          if ((emitPerMapTask) % count > records) {            ++emit;          }          emitCount += emit;          value.set(emit);          return records++ < count;        }        public IntWritable createKey() { return new IntWritable(); }        public IntWritable createValue() { return new IntWritable(); }        public long getPos() throws IOException { return records; }        public void close() throws IOException { }        public float getProgress() throws IOException {          return records / ((float)count);        }      };    }  }/*mapp方法*/  public void map(IntWritable key, IntWritable value,      OutputCollector<IntWritable, NullWritable> output, Reporter reporter)      throws IOException {    //it is expected that every map processes mapSleepCount number of records.     try {      reporter.setStatus("Sleeping... (" +          (mapSleepDuration * (mapSleepCount - count)) + ") ms left");      Thread.sleep(mapSleepDuration); //开始休眠    }    catch (InterruptedException ex) {      throw (IOException)new IOException(          "Interrupted while sleeping").initCause(ex);    }    ++count;    // output reduceSleepCount * numReduce number of random values, so that    // each reducer will get reduceSleepCount number of keys.    int k = key.get();    for (int i = 0; i < value.get(); ++i) {      output.collect(new IntWritable(k + i), NullWritable.get());    }  }/*reduce方法*/  public void reduce(IntWritable key, Iterator<NullWritable> values,      OutputCollector<NullWritable, NullWritable> output, Reporter reporter)      throws IOException {    try {      reporter.setStatus("Sleeping... (" +          (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");        Thread.sleep(reduceSleepDuration); //休眠时间          }    catch (InterruptedException ex) {      throw (IOException)new IOException(          "Interrupted while sleeping").initCause(ex);    }    count++;  }/*初始化参数*/  public void configure(JobConf job) {    this.mapSleepCount =      job.getInt("sleep.job.map.sleep.count", mapSleepCount);    this.reduceSleepCount =      job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);    this.mapSleepDuration =      job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;    this.reduceSleepDuration =      job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;  }  public void close() throws IOException {  }  public static void main(String[] args) throws Exception{    int res = ToolRunner.run(new Configuration(), new SleepJob(), args);    System.exit(res);  }/*driver代码*/  public int run(int numMapper, int numReducer, long mapSleepTime,      int mapSleepCount, long reduceSleepTime,      int reduceSleepCount) throws IOException {    JobConf job = setupJobConf(numMapper, numReducer, mapSleepTime,                   mapSleepCount, reduceSleepTime, reduceSleepCount);    JobClient.runJob(job);    return 0;  }  public JobConf setupJobConf(int numMapper, int numReducer,                                 long mapSleepTime, int mapSleepCount,                                 long reduceSleepTime, int reduceSleepCount) {    JobConf job = new JobConf(getConf(), SleepJob.class);    job.setNumMapTasks(numMapper);//设置map的数目    job.setNumReduceTasks(numReducer);//设置reduce数目    job.setMapperClass(SleepJob.class);//设置map的类    job.setMapOutputKeyClass(IntWritable.class);//设置map的输出中间结果key类型    job.setMapOutputValueClass(NullWritable.class);//设置map的输出中间结果value的类型    job.setReducerClass(SleepJob.class);//设置reduce的类    job.setOutputFormat(NullOutputFormat.class);//设置输出类型    job.setInputFormat(SleepInputFormat.class);//设置输入类型    job.setPartitionerClass(SleepJob.class);//设置partition类型    job.setSpeculativeExecution(false);//关闭speculativeexcution属性    job.setJobName("Sleep job");    FileInputFormat.addInputPath(job, new Path("ignored"));    job.setLong("sleep.job.map.sleep.time", mapSleepTime);    job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);    job.setInt("sleep.job.map.sleep.count", mapSleepCount);    job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);    return job;  }  public int run(String[] args) throws Exception {    if(args.length < 1) {      System.err.println("SleepJob [-m numMapper] [-r numReducer]" +          " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +          " [-recordt recordSleepTime (msec)]");      ToolRunner.printGenericCommandUsage(System.err);      return -1;    }    int numMapper = 1, numReducer = 1;    long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;    int mapSleepCount = 1, reduceSleepCount = 1;    for(int i=0; i < args.length; i++ ) {      if(args[i].equals("-m")) {        numMapper = Integer.parseInt(args[++i]);      }      else if(args[i].equals("-r")) {        numReducer = Integer.parseInt(args[++i]);      }      else if(args[i].equals("-mt")) {        mapSleepTime = Long.parseLong(args[++i]);      }      else if(args[i].equals("-rt")) {        reduceSleepTime = Long.parseLong(args[++i]);      }      else if (args[i].equals("-recordt")) {        recSleepTime = Long.parseLong(args[++i]);      }    }        // sleep for *SleepTime duration in Task by recSleepTime per record    mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));    reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));        return run(numMapper, numReducer, mapSleepTime, mapSleepCount,        reduceSleepTime, reduceSleepCount);  }}
?

热点排行