基于mapreduce的Hadoop join实现分析(一)
对于一个大数据的分析应用,join是必不可少的一项功能.现在很多构建与hadoop之上的应用,如Hive,PIG等在其内部实现了join程序,可以通过很简单的sql语句或者数据操控脚本完成相应的Join工作.那么join应该如何实现呢?今天我们就对join做一个简单的实现.
我们来看一个例子,现在有两组数据:一组为单位人员信息,如下:
人员ID
+ numOutside.toString());
}
reader.close();
}
?
}
程序主体很简单,开始将输出目录删除,中间进行一系列的JobConf设定工作,将输出格式设为SequenceFile,最后读出程序结果到控制台.接下来我们看看Mapper的实现:
import java.io.IOException;
?
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.io.*;
?
public class JoinMapper extends MapReduceBase?
implements Mapper<LongWritable, Text, LongWritable, Record> {
?
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Record> output, Reporter reporter)
throws IOException {
String line = value.toString();
String[] values = line.split(",");
if(values.length == 2){ //这里使用记录的长度来区别地址信息与人员信息,当然可以通过其他方式(如文件名等)来实现
Record reco = new Record();
reco.locId = values[0];
reco.type = 2;
reco.locationName = values[1];+ reco.toString());
}
}
?
for(Record e : employees){
e.locationName = thisLocation.locationName;
output.collect(new LongWritable(0), new Text(e.toString()));
}
System.out.println("+++++++++++++++");
}
}
在reducer端,我们先构造了一个地址对象,thisLocation用来保存地址信息.在reducer的迭代器values中,如果某个value是地址,就将其保存到thisLocation中.否则就将人员信息加入到List中以供后面打印.
这个reducer中有两点需要非常注意:
一,在while (values.hasNext())的循环中的thisLocation = new Record(reco)以及Record recoClone = new Record(reco)语句,我们不能直接保存reducer的迭代器中的对象,因为迭代器中每次返回的对象都是同一个Object,但是具有不同的值.注意,一定要注意.
二,这个是一个比较蹩脚的reduce实现,从程序中我们可以看到.我们用了一个List来保存某个地址ID的所有人员信息,对于一个非常巨大的应用来说,某个地址ID可能具有大于List长度的人员信息,这就会造成List溢出.下次对该程序进行优化从而能够避免该现象.
好啦,看看数据和程序的运行结果吧!
$ ./hadoop fs -cat input/join/names
1,张三,1
2,李四,2
3,王五,1
4,赵六,3
5,马七,3
?
$ ./hadoop fs -cat input/join/locations
1,北京
2,上海
3,广州
?
运行程序:
09/11/20 21:44:09 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
09/11/20 21:44:10 INFO mapred.FileInputFormat: Total input paths to process : 2
09/11/20 21:44:11 INFO mapred.JobClient: Running job: job_200911202139_0001
09/11/20 21:44:12 INFO mapred.JobClient:? map 0% reduce 0%
09/11/20 21:44:24 INFO mapred.JobClient:? map 33% reduce 0%
09/11/20 21:44:26 INFO mapred.JobClient:? map 66% reduce 0%
09/11/20 21:44:28 INFO mapred.JobClient:? map 100% reduce 0%
09/11/20 21:44:35 INFO mapred.JobClient:? map 100% reduce 100%
09/11/20 21:44:36 INFO mapred.JobClient: Job complete: job_200911202139_0001
09/11/20 21:44:37 INFO mapred.JobClient: Counters: 16
09/11/20 21:44:37 INFO mapred.JobClient: ? File Systems
09/11/20 21:44:37 INFO mapred.JobClient: ? ? HDFS bytes read=97
09/11/20 21:44:37 INFO mapred.JobClient: ? ? HDFS bytes written=246
09/11/20 21:44:37 INFO mapred.JobClient: ? ? Local bytes read=243
09/11/20 21:44:37 INFO mapred.JobClient: ? ? Local bytes written=582
09/11/20 21:44:37 INFO mapred.JobClient: ? Job Counters?
09/11/20 21:44:37 INFO mapred.JobClient: ? ? Launched reduce tasks=1