首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > SQL Server >

Hadoop MapReduce 学习札记(三) MapReduce实现类似SQL的SELECT MAX(ID)

2012-06-30 
Hadoop MapReduce 学习笔记(三)MapReduce实现类似SQL的SELECT MAX(ID)? ?本博客属原创文章,转载请注明出处

Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)

? ?本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233718

? ?欢迎加入Hadoop超级群:?180941958

???????? 本博客已迁移到本人独立博客:http://www.yun5u.com/articles/hadoop-mapreduce-sql-max.html

??????? 请先阅读:??????????

?????????? 1.Hadoop MapReduce 学习笔记(一) 序言和准备

??????????????2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

?

???????? 下一篇:Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

?

??????? 从一大堆数中找出最大的数,类似SQL的SELECT MAX(NUMBER) FROM TABLE .这里写了个简单的MapReduce,实现了该功能.我这里会生成测试数据,同时在生成的时候会计算出最大值.待MapReduce跑玩后,你可以去输出路径查看并进行对比.具体请查看代码:

?

package com.guoyun.hadoop.mapreduce.study;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 获得最大的数,类似SQL:SELECT MAX(NUMBER) FROM TABLE * 注意这里只有一列 * 相比 @GetMaxValueMapReduceImproveTest 这里速度会更慢 *  */public class GetMaxValueMapReduceTest extends MyMapReduceSIngleColumnTest{  public static final Logger log=LoggerFactory.getLogger(GetMaxValueMapReduceTest.class);    public GetMaxValueMapReduceTest(int dataLength) throws Exception {    super(dataLength);    // TODO Auto-generated constructor stub  }  public GetMaxValueMapReduceTest(long dataLength, String inputPath,      String outputPath) throws Exception {    super(dataLength, inputPath, outputPath);    // TODO Auto-generated constructor stub  }  public GetMaxValueMapReduceTest(String outputPath) {    super(outputPath);    // TODO Auto-generated constructor stub  }  /**   * Map,to get the source datas   */  public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{    private final Text writeKey=new Text("K");    private LongWritable writeValue=new LongWritable(0);        @Override    protected void map(LongWritable key, Text value, Context context)        throws IOException, InterruptedException {      log.debug("begin to map");      StringTokenizer tokenizer=null;      String lineValue=null;                  tokenizer=new StringTokenizer(value.toString().trim());      while(tokenizer.hasMoreTokens()){        lineValue=tokenizer.nextToken().trim();        if(lineValue.equals("")){          continue;        }        try {          writeValue.set(Long.parseLong(lineValue));          context.write(writeKey, writeValue);        } catch (NumberFormatException e) {          continue;        }              }    }  }    /**   * Reduce,to get the max value   */  public static class MyReducer     extends Reducer<Text,LongWritable,Text,LongWritable>{    private final Text maxValueKey=new Text("maxValue");            @Override    public void reduce(Text key, Iterable<LongWritable> values,Context context)        throws IOException, InterruptedException {      log.debug("begin to reduce");      long maxValue=Long.MIN_VALUE;      for(LongWritable value:values){        if(value.get()>maxValue){          maxValue=value.get();        }      }      context.write(maxValueKey, new LongWritable(maxValue));    }          }    /**   * @param args   */  public static void main(String[] args) {    MyMapReduceTest mapReduceTest=null;    Configuration conf=null;    Job job=null;    FileSystem fs=null;    Path inputPath=null;    Path outputPath=null;    long begin=0;    String output="testDatas/mapreduce/MROutput_SingleColumn_getMax";            try {      mapReduceTest=new GetMaxValueMapReduceTest(10000000);            inputPath=new Path(mapReduceTest.getInputPath());      outputPath=new Path(mapReduceTest.getOutputPath());            conf=new Configuration();      job=new Job(conf,"getMaxValue");            fs=FileSystem.getLocal(conf);      if(fs.exists(outputPath)){        if(!fs.delete(outputPath,true)){          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");          return;        }      }                  job.setJarByClass(GetMaxValueMapReduceTest.class);      job.setMapOutputKeyClass(Text.class);      job.setMapOutputValueClass(LongWritable.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(LongWritable.class);      job.setMapperClass(MyMapper.class);      job.setReducerClass(MyReducer.class);            job.setNumReduceTasks(2);            FileInputFormat.addInputPath(job, inputPath);      FileOutputFormat.setOutputPath(job, outputPath);                 begin=System.currentTimeMillis();      job.waitForCompletion(true);            System.out.println("===================================================");      if(mapReduceTest.isGenerateDatas()){        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());        System.out.println("The minValue is:"+mapReduceTest.getMinValue());      }      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));      // Spend time:18908          } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }  }}

热点排行