Hadoop生成HFile直接入库HBase心得
转载请标明出处:http://blackwing.iteye.com/blog/1991380
hbase自带了ImportTsv类,可以直接把tsv格式(官方教材显示,是\t分割各个字段的文本格式)生成HFile,并且使用另外一个类org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移动到hbase对应的hdfs目录。
PS:网上看到一个XD说,直接生成HFile并入库HBase效率不如先生成HFile,再通过LoadIncrementalHFiles移动文件到hbase目录高,这点没有验证,我的做法也是先生成,再move。
官方教材在此:
http://hbase.apache.org/book/ops_mgt.html#importtsv
topsid uid roler_num typ time10 111111 255 0 1386553377000
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type -Dimporttsv.bulk.output=hdfs://storefile-outputdir <hdfs-data-inputdir>
row : 10 cf : kqqualifier: topsidvalue: 10.....
row : 10-111111-255cf : kqqualifier: 0value: 1
/* * adminOnOff.log 文件格式: * topsid uid roler_num typ time * */public class HFileImportMapper2 extendsMapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {protected SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");protected final String CF_KQ="kq";//考勤protected final int ONE=1;@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String line = value.toString();System.out.println("line : "+line);String[] datas = line.split("\\s+");// row格式为:yyyyMMdd-sid-uid-role_num-timestamp-typString row = sdf.format(new Date(Long.parseLong(datas[4])))+ "-" + datas[0] + "-" + datas[1] + "-" + datas[2]+ "-" + datas[4] + "-" + datas[3];ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(row));KeyValue kv = new KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(), datas[3].getBytes(),Bytes.toBytes(this.ONE));context.write(rowkey, kv);}}
public class GenHFile2 {public static void main(String[] args) {Configuration conf = new Configuration();conf.addResource("myConf.xml");String input = conf.get("input");String output = conf.get("output");String tableName = conf.get("source_table");System.out.println("table : "+tableName);HTable table;try {//运行前,删除已存在的中间输出目录try {FileSystem fs = FileSystem.get(URI.create(output), conf);fs.delete(new Path(output),true);fs.close();} catch (IOException e1) {e1.printStackTrace();}table = new HTable(conf,tableName.getBytes());Job job = new Job(conf);job.setJobName("Generate HFile");job.setJarByClass(HFileImportMapper2.class);job.setInputFormatClass(TextInputFormat.class);job.setMapperClass(HFileImportMapper2.class);FileInputFormat.setInputPaths(job, input);//job.setReducerClass(KeyValueSortReducer.class);//job.setMapOutputKeyClass(ImmutableBytesWritable.class);//job.setMapOutputValueClass(KeyValue.class);job.getConfiguration().set("mapred.mapoutput.key.class", "org.apache.hadoop.hbase.io.ImmutableBytesWritable");job.getConfiguration().set("mapred.mapoutput.value.class", "org.apache.hadoop.hbase.KeyValue");//job.setOutputFormatClass(HFileOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(output));//job.setPartitionerClass(SimpleTotalOrderPartitioner.class);HFileOutputFormat.configureIncrementalLoad(job,table);try {job.waitForCompletion(true);} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}} catch (IOException e) {e.printStackTrace();}}}
hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560
HFileOutputFormat.configureIncrementalLoad(job,table);
public static void configureIncrementalLoad(Job job, HTable table) throws IOException { Configuration conf = job.getConfiguration(); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else if (Text.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(TextSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); // Use table's region boundaries for TOP split points. LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); configurePartitioner(job, startKeys); // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); configureBlockSize(table, conf); TableMapReduceUtil.addDependencyJars(job); TableMapReduceUtil.initCredentials(job); LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured."); }