HBase之旅四:HBase MapReduce实例分析(转自:Taobao QA Team)
引言
跟Hadoop的无缝集成使得使用MapReduce对HBase的数据进行分布式计算非常方便,本文将以前面的blog示例,介绍HBase下MapReduce开发要点。很好理解本文前提是你对Hadoop MapReduce有一定的了解,如果你是初次接触Hadoop MapReduce编程,可以参考http://qa.taobao.com/?p=10523 这篇文章来建立基本概念。
HBase MapReduce核心类介绍
首先一起来回顾下MapReduce的基本编程模型,
代码实现
有了上面的分析,代码实现就比较简单了。只需以下几步
public static class Mapper extends TableMapper <ImmutableBytesWritable, ImmutableBytesWritable> {
?public Mapper() {}
?@Override
?public void map(ImmutableBytesWritable row, Result values,Context context) throws IOException {
????ImmutableBytesWritable value = null;
????String[] tags = null;
????for (KeyValue kv : values.list()) {
??????if ("author".equals(Bytes.toString(kv.getFamily()))
??????&& "nickname".equals(Bytes.toString(kv.getQualifier()))) {
??????value = new ImmutableBytesWritable(kv.getValue());
??????}
??????if ("article".equals(Bytes.toString(kv.getFamily()))
??????&& "tags".equals(Bytes.toString(kv.getQualifier()))) {
????????tags = Bytes.toString(kv.getValue()).split(",");
??????}
???}
????for (int i = 0; i < tags.length; i++) {
??????ImmutableBytesWritable key = new ImmutableBytesWritable(
??????Bytes.toBytes(tags[i].toLowerCase()));
??????try {
??????????context.write(key,value);
??????} catch (InterruptedException e) {
?????????throw new IOException(e);
????????}
??????}
???}
}
public static class Reducer extends TableReducer <ImmutableBytesWritable, ImmutableBytesWritable, ImmutableBytesWritable> {
?@Override
?public void reduce(ImmutableBytesWritable key,Iterable values,
?? Context context) throws IOException, InterruptedException {
??String friends="";
??for (ImmutableBytesWritable val : values) {
???friends += (friends.length()>0?",":"")+Bytes.toString(val.get());
??}
??Put put = new Put(key.get());
??put.add(Bytes.toBytes("person"), Bytes.toBytes("nicknames"),
??Bytes.toBytes(friends));
??context.write(key, put);
?}
}
public static void main(String[] args) throws Exception {
?Configuration conf = new Configuration();
?conf = HBaseConfiguration.create(conf);
?Job job = new Job(conf, "HBase_FindFriend");
?job.setJarByClass(FindFriend.class);
?Scan scan = new Scan();
?scan.addColumn(Bytes.toBytes("author"),Bytes.toBytes("nickname"));
?scan.addColumn(Bytes.toBytes("article"),Bytes.toBytes("tags"));
?TableMapReduceUtil.initTableMapperJob("blog", scan,FindFriend.Mapper.class,
??ImmutableBytesWritable.class, ImmutableBytesWritable.class, job);
?TableMapReduceUtil.initTableReducerJob("tag_friend",FindFriend.Reducer.class, job);
?System.exit(job.waitForCompletion(true) ? 0 : 1);
}
小结
本文通过实例分析演示了使用MapReduce分析HBase的数据,需要注意的这只是一种常规的方式(分析表中的数据存到另外的表中),实际上不局限于此,不过其他方式跟此类似。如果你进行到这里,你肯定想要马上运行它看看结果,在下篇文章中将介绍如何在模拟集群环境下本机运行MapReduce任务进行测试。