首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

Sort源代码诠释

2012-11-09 
Sort源代码注释package org.apache.hadoop.examplesimport java.io.IOExceptionimport java.net.URIimp

Sort源代码注释

package org.apache.hadoop.examples;import java.io.IOException;import java.net.URI;import java.util.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapred.*;import org.apache.hadoop.mapred.lib.IdentityMapper;import org.apache.hadoop.mapred.lib.IdentityReducer;import org.apache.hadoop.mapred.lib.InputSampler;import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** hadoop 的map/reduce例子,排序,由于map传给reduce的中间结果是排序的,所以这个例子不用写mapper和reducer。都用默认的map/reduce的实现,IdentityMapper和IdentityReducer。例子中可以用排序采样器TotalOrderPartitioner,参数设置可以是 -totalOrder 0.1 10000 10 * This is the trivial map/reduce program that does absolutely nothing * other than use the framework to fragment and sort the input values. * * To run: bin/hadoop jar build/hadoop-examples.jar sort *            [-m <i>maps</i>] [-r <i>reduces</i>] *            [-inFormat <i>input format class</i>]  *            [-outFormat <i>output format class</i>]  *            [-outKey <i>output key class</i>]  *            [-outValue <i>output value class</i>]  *            [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>] *            <i>in-dir</i> <i>out-dir</i>  */public class Sort<K,V> extends Configured implements Tool {  private RunningJob jobResult = null;  static int printUsage() {    System.out.println("sort [-m <maps>] [-r <reduces>] " +                       "[-inFormat <input format class>] " +                       "[-outFormat <output format class>] " +                        "[-outKey <output key class>] " +                       "[-outValue <output value class>] " +                       "[-totalOrder <pcnt> <num samples> <max splits>] " +                       "<input> <output>");    ToolRunner.printGenericCommandUsage(System.out);    return -1;  }  /**driver代码   * The main driver for sort program.   * Invoke this method to submit the map/reduce job.   * @throws IOException When there is communication problems with the    *                     job tracker.   */  public int run(String[] args) throws Exception {    JobConf jobConf = new JobConf(getConf(), Sort.class);    jobConf.setJobName("sorter");    jobConf.setMapperClass(IdentityMapper.class);  //设置mapper          jobConf.setReducerClass(IdentityReducer.class);//设置reducer    JobClient client = new JobClient(jobConf);    ClusterStatus cluster = client.getClusterStatus();//获得集群的状态    int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);    String sort_reduces = jobConf.get("test.sort.reduces_per_host");    if (sort_reduces != null) {       num_reduces = cluster.getTaskTrackers() *                        Integer.parseInt(sort_reduces);    }    Class<? extends InputFormat> inputFormatClass =       SequenceFileInputFormat.class;    Class<? extends OutputFormat> outputFormatClass =       SequenceFileOutputFormat.class;    Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;    Class<? extends Writable> outputValueClass = BytesWritable.class;    List<String> otherArgs = new ArrayList<String>();    InputSampler.Sampler<K,V> sampler = null;    for(int i=0; i < args.length; ++i) {      try {        if ("-m".equals(args[i])) {          jobConf.setNumMapTasks(Integer.parseInt(args[++i]));        } else if ("-r".equals(args[i])) {          num_reduces = Integer.parseInt(args[++i]);        } else if ("-inFormat".equals(args[i])) {          inputFormatClass =             Class.forName(args[++i]).asSubclass(InputFormat.class);        } else if ("-outFormat".equals(args[i])) {          outputFormatClass =             Class.forName(args[++i]).asSubclass(OutputFormat.class);        } else if ("-outKey".equals(args[i])) {          outputKeyClass =             Class.forName(args[++i]).asSubclass(WritableComparable.class);        } else if ("-outValue".equals(args[i])) {          outputValueClass =             Class.forName(args[++i]).asSubclass(Writable.class);        } else if ("-totalOrder".equals(args[i])) { //设置采样器3个参数          double pcnt = Double.parseDouble(args[++i]);          int numSamples = Integer.parseInt(args[++i]);          int maxSplits = Integer.parseInt(args[++i]);          if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;          sampler =            new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);        } else {          otherArgs.add(args[i]);        }      } catch (NumberFormatException except) {        System.out.println("ERROR: Integer expected instead of " + args[i]);        return printUsage();      } catch (ArrayIndexOutOfBoundsException except) {        System.out.println("ERROR: Required parameter missing from " +            args[i-1]);        return printUsage(); // exits      }    }    // Set user-supplied (possibly default) job configs    jobConf.setNumReduceTasks(num_reduces);    jobConf.setInputFormat(inputFormatClass);    jobConf.setOutputFormat(outputFormatClass);    jobConf.setOutputKeyClass(outputKeyClass);    jobConf.setOutputValueClass(outputValueClass);    // Make sure there are exactly 2 parameters left.    if (otherArgs.size() != 2) {      System.out.println("ERROR: Wrong number of parameters: " +          otherArgs.size() + " instead of 2.");      return printUsage();    }    FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));    FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));    if (sampler != null) {      System.out.println("Sampling input to effect total-order sort...");      jobConf.setPartitionerClass(TotalOrderPartitioner.class);//设置采样器      Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];      inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));      Path partitionFile = new Path(inputDir, "_sortPartitioning");      TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);//采样设置采样文件      InputSampler.<K,V>writePartitionFile(jobConf, sampler);      URI partitionUri = new URI(partitionFile.toString() +                                 "#" + "_sortPartitioning");      DistributedCache.addCacheFile(partitionUri, jobConf);      DistributedCache.createSymlink(jobConf);    }    System.out.println("Running on " +        cluster.getTaskTrackers() +        " nodes to sort from " +         FileInputFormat.getInputPaths(jobConf)[0] + " into " +        FileOutputFormat.getOutputPath(jobConf) +        " with " + num_reduces + " reduces.");    Date startTime = new Date();    System.out.println("Job started: " + startTime);    jobResult = JobClient.runJob(jobConf);    Date end_time = new Date();    System.out.println("Job ended: " + end_time);    System.out.println("The job took " +         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");    return 0;  }  public static void main(String[] args) throws Exception {    int res = ToolRunner.run(new Configuration(), new Sort(), args);    System.exit(res);  }  /**   * Get the last job that was run using this instance.   * @return the results of the last job that was run   */  public RunningJob getResult() {    return jobResult;  }}
?

热点排行