首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

Hadoop HelloWorld Examples - 求k贴近点(+自定义变量+参数传入)

2013-09-05 
Hadoop HelloWorld Examples - 求k临近点(+自定义变量+参数传入)懂得了Map-Reduce的原理后,很容易类推,把

Hadoop HelloWorld Examples - 求k临近点(+自定义变量+参数传入)
  懂得了Map-Reduce的原理后,很容易类推,把一些常见问题也搬到Hadoop上。这次尝试下经典的求K临近点。同时为了更加深入地学习下Hadoop各种特性,除了算法上把k临近点问题映射到map - reduce上,还尝试了(1)Hadoop的通过Configuration传入参数(给map或reduce),(2)自定义数据类型作为value/key,(3)重写FileInputFormat和RecordReader来读入自己定义的数据。
  问题描述:给定一个中心点cpoint,和另外一堆点points,求另外一堆点points到cpoint的距离,并且按照从小到达的顺序排列(然后再自己遍历一边就求得k近邻了,这里忽略这一步)。
  Map-Reduce算法:每个map对应接受points里面的一个点point,然后在map里求point和中心点cpoint(通过Configuration传入)的距离,然后把该距离作为map的输出key,而 map的输出value为该点point坐标。这样子经过shuffle&sort中间步骤后,到达reduce时key(即每个点到cpoint的距离)已经按照大小排好序,直接输出即可。
  输入数据(各个点的坐标):
  2 2
  3 4
  9 8
  20 10
  20 10
  15 14
  89 15

  输出数据(第一个数据是距离指定点cpoint的坐标,第二个数据是点point自己的坐标,cpoint在main函数里面通过Configuration指定和传入)
  2.828427       2.0,2.0
  5.0                   3.0,4.0
  12.0415945   9.0,8.0
  20.518284     15.0,14.0
  22.36068       20.0,10.0

  22.36068       20.0,10.0

  90.255196     89.0,15.0

  具体的Map的Input key是当前字符偏移量,Input value是每个点的2D坐标。

  代码:

  首先是自定义的类Point2D,为了让Map和Reducer能够接受该自定义类作为key/value,必须相应继承Interface:WritableComparable/Writable,因为map-reduce有自己的serialize和deserialize机制。

  补充一点是我找了很多的材料,都说自定义的类只要继承这两个相应的接口,map和reduce就能识别并且接受它们作为key/value。但我代码试了下不行。Eclipse提示因为我用了默认TextInputFileFormat,所以value必须是Text,而不能是我自己定义的Point2D.为此我不得不实现了自定义的FileInputFormat和RecordReader,来解析我自己的数据结构Point2D。

import java.io.*;import java.util.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.*;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.*;import org.apache.hadoop.mapreduce.lib.output.*;public class KNPoints {public static class KNPointMapper extends Mapper<LongWritable, Point2D, FloatWritable, Point2D>{@Overridepublic void map(LongWritable key, Point2D value, Context context) throws IOException, InterruptedException{Configuration conf = context.getConfiguration();String cPos = conf.get("cPoint");//The center point that we want to calculate the others' distance from it.String[] cxy = cPos.split(" ");Point2D cpoint = new Point2D();cpoint.x = Integer.parseInt(cxy[0]);cpoint.y = Integer.parseInt(cxy[1]);float dis = (float)Math.sqrt( Math.pow((value.x - cpoint.x), (2.0f)) +  Math.pow((value.y - cpoint.y), (2.0f)));context.write(new FloatWritable(dis), value);}}public static class KNPointReducer extends Reducer<FloatWritable, Point2D, FloatWritable, Point2D>{@Overridepublic void reduce(FloatWritable key, Iterable<Point2D> values, Context context) throws IOException, InterruptedException{for(Point2D val : values){context.write(key, val);}}}public static void main(String[] args) throws Exception{Configuration conf = new Configuration();conf.addResource(new Path("/usr/local/hadoop/conf/core-site.xml"));conf.set("cPoint", "0 0");//Define the center point. We calculate the others' distance with the center point.Job job = new Job(conf);job.setInputFormatClass(KNPointInputFormat.class);//My own input formatjob.setOutputFormatClass(TextOutputFormat.class);job.setJarByClass(KNPoints.class);job.setMapperClass(KNPointMapper.class);job.setReducerClass(KNPointReducer.class);job.setOutputKeyClass(FloatWritable.class);job.setOutputValueClass(Point2D.class);String in = "hdfs://localhost:9000/user/hadoop/input/data";String out = "hdfs://localhost:9000/user/hadoop/output";FileSystem fs = FileSystem.get(conf);fs.copyFromLocalFile(new Path("/home/hadoop/CodeSpace/KNPoints/data"), new Path("hdfs://localhost:9000/user/hadoop/input/"));FileInputFormat.addInputPath(job, new Path(in));FileOutputFormat.setOutputPath(job, new Path(out));if(fs.exists(new Path(out)) == true){fs.delete(new Path(out),true);}job.waitForCompletion(true);}}


热点排行