首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > SQL Server >

Hadoop MapReduce 学习札记(五) MapReduce实现类似SQL的max和min

2012-07-31 
Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min? 本博客属原创文章,转载请注明出处:http:

Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min

? 本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233726

? 欢迎加入Hadoop超级群:?180941958

?????? 本博客已迁移到本人独立博客:http://www.yun5u.com/articles/hadoop-mapreduce-sql-max-min.html

?????? 请先阅读:??????????

?????????? 1.Hadoop MapReduce 学习笔记(一) 序言和准备

?????????? 2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

?????????? 3.Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)

?????????? 4.Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

?

??? 下一篇:Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min? 正确写法

?

??????? Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进只是找出一列中的最大值,但我又想找出最小值,或者平均,或者一列的总和呢.这里也就是想多输出几个结果,之前只是一个.MapReduce该如何实现呢?具体请看代码吧:但这里是一个错误的实现,注意,输出单个值跟输出多个值的Map和Reduce写法是不一样的.

?

?

package com.guoyun.hadoop.mapreduce.study;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * 或得最大和最小值,类似SQL:SELECT MAX(NUMBER),MIN(NUMBER) FROM TABLE * 注意:这里只有一列数据,多列请查看 @GetMaxAndMinValueMultiMapReduceTest *  * 这是个错误的写法,结果类似: *  maxValue 10000000    minValue  9999999    maxValue  9999955    minValue  9223372036854775807    maxValue  119    minValue  9223372036854775807    maxValue  9999889    minValue  9223372036854775807    ... * 会有多个maxValue和minValue * 正确的写法请参考 @GetMaxAndMinValueMapReduceFixTest */public class GetMaxAndMinValueMapReduceTest extends MyMapReduceSIngleColumnTest{    public static final Logger log=LoggerFactory.getLogger(GetMaxAndMinValueMapReduceTest.class);    public GetMaxAndMinValueMapReduceTest(String outputPath) {    super(outputPath);    // TODO Auto-generated constructor stub  }    /**   * Map,to get the source datas   */  public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{    private final Text writeKey=new Text("K");    private LongWritable writeValue=new LongWritable(0);        @Override    protected void map(LongWritable key, Text value, Context context)        throws IOException, InterruptedException {      log.debug("begin to map");      StringTokenizer tokenizer=null;      String lineValue=null;                  tokenizer=new StringTokenizer(value.toString().trim());      while(tokenizer.hasMoreTokens()){        lineValue=tokenizer.nextToken().trim();        if(lineValue.equals("")){          continue;        }        try {          writeValue.set(Long.parseLong(lineValue));          context.write(writeKey, writeValue);        } catch (NumberFormatException e) {          continue;        }              }    }  }    public static class MyCombiner    extends Reducer<Text,LongWritable,Text,LongWritable>{    private final Text maxValueKey=new Text("maxValue");    private final Text minValueKey=new Text("minValue");        @Override    public void reduce(Text key, Iterable<LongWritable> values,Context context)        throws IOException, InterruptedException {      log.debug("begin to combine");      long maxValue=Long.MIN_VALUE;      long minValue=Long.MAX_VALUE;      long valueTmp=0;      LongWritable writeValue=new LongWritable(0);       for(LongWritable value:values){        valueTmp=value.get();        if(valueTmp>maxValue){          maxValue=valueTmp;        }else if(valueTmp<minValue){          minValue=valueTmp;        }      }      writeValue.set(maxValue);      context.write(maxValueKey, writeValue);      writeValue.set(minValue);      context.write(minValueKey, writeValue);    }       }      /**   * Reduce,to get the max value   */  public static class MyReducer     extends Reducer<Text,LongWritable,Text,LongWritable>{    private final Text maxValueKey=new Text("maxValue");    private final Text minValueKey=new Text("minValue");            @Override    public void reduce(Text key, Iterable<LongWritable> values,Context context)        throws IOException, InterruptedException {      log.debug("begin to reduce");      long maxValue=Long.MIN_VALUE;      long minValue=Long.MAX_VALUE;      long valueTmp=0;      LongWritable writeValue=new LongWritable(0);       System.out.println(key.toString());      for(LongWritable value:values){        valueTmp=value.get();        if(valueTmp>maxValue){          maxValue=valueTmp;        }else if(valueTmp<minValue){          minValue=valueTmp;        }      }      writeValue.set(maxValue);      context.write(maxValueKey, writeValue);      writeValue.set(minValue);      context.write(minValueKey, writeValue);    }   }    /**   * @param args   */  public static void main(String[] args) {    MyMapReduceTest mapReduceTest=null;    Configuration conf=null;    Job job=null;    FileSystem fs=null;    Path inputPath=null;    Path outputPath=null;    long begin=0;    String output="testDatas/mapreduce/MROutput_SingleColumn_getMaxAndMin";            try {      mapReduceTest=new GetMaxAndMinValueMapReduceTest(output);            inputPath=new Path(mapReduceTest.getInputPath());      outputPath=new Path(mapReduceTest.getOutputPath());            conf=new Configuration();      job=new Job(conf,"getMaxAndMinValue");            fs=FileSystem.getLocal(conf);      if(fs.exists(outputPath)){        if(!fs.delete(outputPath,true)){          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");          return;        }      }                  job.setJarByClass(GetMaxAndMinValueMapReduceTest.class);      job.setMapOutputKeyClass(Text.class);      job.setMapOutputValueClass(LongWritable.class);      job.setOutputKeyClass(Text.class);      job.setOutputValueClass(LongWritable.class);      job.setMapperClass(MyMapper.class);      job.setCombinerClass(MyCombiner.class);      job.setReducerClass(MyReducer.class);            job.setNumReduceTasks(2);            FileInputFormat.addInputPath(job, inputPath);      FileOutputFormat.setOutputPath(job, outputPath);                  begin=System.currentTimeMillis();      job.waitForCompletion(true);            System.out.println("===================================================");      if(mapReduceTest.isGenerateDatas()){        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());        System.out.println("The minValue is:"+mapReduceTest.getMinValue());      }      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));      // Spend time:12334          } catch (Exception e) {      // TODO Auto-generated catch block      e.printStackTrace();    }      }}

热点排行