首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

基于mapreduce的Hadoop join兑现分析(一)

2012-06-30 
基于mapreduce的Hadoop join实现分析(一)对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构

基于mapreduce的Hadoop join实现分析(一)

对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现.

我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:

人员ID

+ numOutside.toString());

}

reader.close();

}

?

}

程序主体很简单,开始将输出目录删除,中间进行一系列的JobConf设定工作,将输出格式设为SequenceFile,最后读出程序结果到控制台.接下来我们看看Mapper的实现:

import java.io.IOException;

?

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.io.*;

?

public class JoinMapper extends MapReduceBase?

implements Mapper<LongWritable, Text, LongWritable, Record> {

?

public void map(LongWritable key, Text value,

OutputCollector<LongWritable, Record> output, Reporter reporter)

throws IOException {

String line = value.toString();

String[] values = line.split(",");

if(values.length == 2){ //这里使用记录的长度来区别地址信息与人员信息,当然可以通过其他方式(如文件名等)来实现

Record reco = new Record();

reco.locId = values[0];

reco.type = 2;

reco.locationName = values[1];+ reco.toString());

}

}

?

for(Record e : employees){

e.locationName = thisLocation.locationName;

output.collect(new LongWritable(0), new Text(e.toString()));

}

System.out.println("+++++++++++++++");

}

}

在reducer端,我们先构造了一个地址对象,thisLocation用来保存地址信息.在reducer的迭代器values中,如果某个value是地址,就将其保存到thisLocation中.否则就将人员信息加入到List中以供后面打印.

这个reducer中有两点需要非常注意:

一,在while (values.hasNext())的循环中的thisLocation = new Record(reco)以及Record recoClone = new Record(reco)语句,我们不能直接保存reducer的迭代器中的对象,因为迭代器中每次返回的对象都是同一个Object,但是具有不同的值.注意,一定要注意.

二,这个是一个比较蹩脚的reduce实现,从程序中我们可以看到.我们用了一个List来保存某个地址ID的所有人员信息,对于一个非常巨大的应用来说,某个地址ID可能具有大于List长度的人员信息,这就会造成List溢出.下次对该程序进行优化从而能够避免该现象.

好啦,看看数据和程序的运行结果吧!

$ ./hadoop fs -cat input/join/names

1,张三,1

2,李四,2

3,王五,1

4,赵六,3

5,马七,3

?

$ ./hadoop fs -cat input/join/locations

1,北京

2,上海

3,广州

?

运行程序:

09/11/20 21:44:09 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

09/11/20 21:44:10 INFO mapred.FileInputFormat: Total input paths to process : 2

09/11/20 21:44:11 INFO mapred.JobClient: Running job: job_200911202139_0001

09/11/20 21:44:12 INFO mapred.JobClient:? map 0% reduce 0%

09/11/20 21:44:24 INFO mapred.JobClient:? map 33% reduce 0%

09/11/20 21:44:26 INFO mapred.JobClient:? map 66% reduce 0%

09/11/20 21:44:28 INFO mapred.JobClient:? map 100% reduce 0%

09/11/20 21:44:35 INFO mapred.JobClient:? map 100% reduce 100%

09/11/20 21:44:36 INFO mapred.JobClient: Job complete: job_200911202139_0001

09/11/20 21:44:37 INFO mapred.JobClient: Counters: 16

09/11/20 21:44:37 INFO mapred.JobClient: ? File Systems

09/11/20 21:44:37 INFO mapred.JobClient: ? ? HDFS bytes read=97

09/11/20 21:44:37 INFO mapred.JobClient: ? ? HDFS bytes written=246

09/11/20 21:44:37 INFO mapred.JobClient: ? ? Local bytes read=243

09/11/20 21:44:37 INFO mapred.JobClient: ? ? Local bytes written=582

09/11/20 21:44:37 INFO mapred.JobClient: ? Job Counters?

09/11/20 21:44:37 INFO mapred.JobClient: ? ? Launched reduce tasks=1