首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

地图reduce编程模型之hbase输入hdfs多路输出

2014-01-15 
mapreduce编程模型之hbase输入hdfs多路输出import java.io.IOExceptionimport java.text.SimpleDateForma

mapreduce编程模型之hbase输入hdfs多路输出
import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;import com.bfd.util.Const;public class IPCount {static class MyMapper extends TableMapper<Text, Text> {@Overridepublic void map(ImmutableBytesWritable row, Result value,Context context) throws IOException, InterruptedException { for (KeyValue kv : value.raw()) {val = new String(kv.getValue(),"UTF-8");qualifier = new String(kv.getQualifier());if(qualifier.indexOf(">brand")==-1){context.write(gid, outVal);}}}}static class MyReducer extends Reducer<Text, Text, Text, Text> {@SuppressWarnings("rawtypes")private MultipleOutputs multipleOutputs; protected void setup(Context context) throws IOException, InterruptedException {multipleOutputs =new MultipleOutputs<Text,Text>(context);}protected void cleanup(Context context) throws IOException, InterruptedException {multipleOutputs.close();}@SuppressWarnings("unchecked")public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {if(isTrue){multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"active_normal");}else{multipleOutputs.write(NullWritable.get(),gid+"\t"+value,"nonactive_normal");}} }public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.quorum", Const.ZOOKEEPER_QUORAM);conf.set("zookeeper.znode.parent", Const.ZOOKEEPER_ZNODE_PARENT);Job job = new Job(conf, "IPCount");job.setJarByClass(IPCount.class);Scan scan = new Scan();scan.setCaching(500);scan.setCacheBlocks(false);TableMapReduceUtil.initTableMapperJob(args[0],scan,MyMapper.class,Text.class,Text.class,job);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setNumReduceTasks(10);FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}

?

热点排行