首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

地图reduce排序(自定义Partition)

2013-09-28 
mapreduce排序(自定义Partition)import java.io.IOExceptionimport org.apache.hadoop.conf.Configuratio

mapreduce排序(自定义Partition)

import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/** * mapreduce排序 *  * @author asheng file1:1.txt 2 32 654 32 15 756 65223 *  *         file2:2.txt 5956 22 650 92 *  *         file3:3.txt 26 54 6 *         对file1,file2,file3进行排序,能够第一想到的便是mapreduce自动排序,但是这里面有问题: *         Reduce排序只是对发送到自己所在的节点的数据进行排序,不能保证整体的顺序 *         所以这里要自定义Partition,保证Partition后,Reduce上的数据在整体上是有序的,然后在reduce内进行排序 */public class Sort {public static class Map extendsMapper<Object, Text, IntWritable, IntWritable> {private IntWritable data = new IntWritable();public void map(Object key, Text value, Context context)throws IOException, InterruptedException {data.set(Integer.parseInt(value.toString()));context.write(data, new IntWritable(1));}}public static class Reduce extendsReducer<IntWritable, IntWritable, IntWritable, IntWritable> {private IntWritable data = new IntWritable(1);public void reduce(IntWritable key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {for (IntWritable v : values) {System.out.println(v);context.write(data, key);data = new IntWritable(data.get() + 1);}}}public static class Partition extends Partitioner<IntWritable, IntWritable> {@Overridepublic int getPartition(IntWritable key, IntWritable value,int numPartitions) {int Maxnumber = 65223;int bound = Maxnumber / numPartitions + 1;int keynumber = key.get();for (int i = 0; i < numPartitions; i++) {if (keynumber < bound * i && keynumber >= bound * (i - 1)) {return i - 1;}}return 0;}}public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();Job job = new Job(conf, "sort");job.setJarByClass(Sort.class);job.setMapperClass(Map.class);job.setReducerClass(Reduce.class);job.setPartitionerClass(Partition.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job, "/home/asheng/hadoop/in");FileOutputFormat.setOutputPath(job, new Path("/home/asheng/hadoop/out"));job.waitForCompletion(true);}}

?

热点排行