首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

Hadoop生成HFile直接入库HBase经验

2013-12-20 
Hadoop生成HFile直接入库HBase心得转载请标明出处:http://blackwing.iteye.com/blog/1991380hbase自带了Im

Hadoop生成HFile直接入库HBase心得
转载请标明出处:http://blackwing.iteye.com/blog/1991380


hbase自带了ImportTsv类,可以直接把tsv格式(官方教材显示,是\t分割各个字段的文本格式)生成HFile,并且使用另外一个类org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移动到hbase对应的hdfs目录。

PS:网上看到一个XD说,直接生成HFile并入库HBase效率不如先生成HFile,再通过LoadIncrementalHFiles移动文件到hbase目录高,这点没有验证,我的做法也是先生成,再move。

官方教材在此:

http://hbase.apache.org/book/ops_mgt.html#importtsv

但ImportTsv功能对我来说不适合,例如文件格式为:
topsid   uid   roler_num   typ        time10      111111   255         0       1386553377000

ImportTsv导入的命令为:
bin/hbase org.apache.hadoop.hbase.mapreduce.ImportTsv -Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type -Dimporttsv.bulk.output=hdfs://storefile-outputdir <hdfs-data-inputdir>


它生成的表格式为:
row : 10 cf  :  kqqualifier: topsidvalue: 10.....

而我要求的格式是:
row : 10-111111-255cf  :  kqqualifier: 0value: 1


所以还是自己写MR处理数据方便。
Mapper:
/* * adminOnOff.log 文件格式: * topsid   uid   roler_num   typ   time * */public class HFileImportMapper2 extendsMapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {protected SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");protected final String CF_KQ="kq";//考勤protected final int ONE=1;@Overrideprotected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {String line = value.toString();System.out.println("line : "+line);String[] datas = line.split("\\s+");// row格式为:yyyyMMdd-sid-uid-role_num-timestamp-typString row = sdf.format(new Date(Long.parseLong(datas[4])))+ "-" + datas[0] + "-" + datas[1] + "-" + datas[2]+ "-" + datas[4] + "-" + datas[3];ImmutableBytesWritable rowkey = new ImmutableBytesWritable(Bytes.toBytes(row));KeyValue kv = new KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(), datas[3].getBytes(),Bytes.toBytes(this.ONE));context.write(rowkey, kv);}}


job:
public class GenHFile2 {public static void main(String[] args) {Configuration conf = new Configuration();conf.addResource("myConf.xml");String input = conf.get("input");String output = conf.get("output");String tableName = conf.get("source_table");System.out.println("table : "+tableName);HTable table;try {//运行前,删除已存在的中间输出目录try {FileSystem fs = FileSystem.get(URI.create(output), conf);fs.delete(new Path(output),true);fs.close();} catch (IOException e1) {e1.printStackTrace();}table = new HTable(conf,tableName.getBytes());Job job = new Job(conf);job.setJobName("Generate HFile");job.setJarByClass(HFileImportMapper2.class);job.setInputFormatClass(TextInputFormat.class);job.setMapperClass(HFileImportMapper2.class);FileInputFormat.setInputPaths(job, input);//job.setReducerClass(KeyValueSortReducer.class);//job.setMapOutputKeyClass(ImmutableBytesWritable.class);//job.setMapOutputValueClass(KeyValue.class);job.getConfiguration().set("mapred.mapoutput.key.class", "org.apache.hadoop.hbase.io.ImmutableBytesWritable");job.getConfiguration().set("mapred.mapoutput.value.class", "org.apache.hadoop.hbase.KeyValue");//job.setOutputFormatClass(HFileOutputFormat.class);FileOutputFormat.setOutputPath(job, new Path(output));//job.setPartitionerClass(SimpleTotalOrderPartitioner.class);HFileOutputFormat.configureIncrementalLoad(job,table);try {job.waitForCompletion(true);} catch (InterruptedException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}} catch (IOException e) {e.printStackTrace();}}}


生成的HFile文件在hdfs的/output目录下,已经根据cf名称建好文件目录:
hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560

其中:
HFileOutputFormat.configureIncrementalLoad(job,table);

根据其源码知道,会自动为job设置好以下参数:
  public static void configureIncrementalLoad(Job job, HTable table)  throws IOException {    Configuration conf = job.getConfiguration();    job.setOutputKeyClass(ImmutableBytesWritable.class);    job.setOutputValueClass(KeyValue.class);    job.setOutputFormatClass(HFileOutputFormat.class);    // Based on the configured map output class, set the correct reducer to properly    // sort the incoming values.    // TODO it would be nice to pick one or the other of these formats.    if (KeyValue.class.equals(job.getMapOutputValueClass())) {      job.setReducerClass(KeyValueSortReducer.class);    } else if (Put.class.equals(job.getMapOutputValueClass())) {      job.setReducerClass(PutSortReducer.class);    } else if (Text.class.equals(job.getMapOutputValueClass())) {      job.setReducerClass(TextSortReducer.class);    } else {      LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());    }    conf.setStrings("io.serializations", conf.get("io.serializations"),        MutationSerialization.class.getName(), ResultSerialization.class.getName(),        KeyValueSerialization.class.getName());    // Use table's region boundaries for TOP split points.    LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +        "to match current region count");    job.setNumReduceTasks(startKeys.size());    configurePartitioner(job, startKeys);    // Set compression algorithms based on column families    configureCompression(table, conf);    configureBloomType(table, conf);    configureBlockSize(table, conf);    TableMapReduceUtil.addDependencyJars(job);    TableMapReduceUtil.initCredentials(job);    LOG.info("Incremental table " + Bytes.toString(table.getTableName()) + " output configured.");  }


HFileOutputFormat只支持写单个column family,如果有多个cf,则需要写多个job来实现了。

热点排行