首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

Bulkload 将Hdfs变换为Hfile的一个Demo

2013-10-23 
Bulkload 将Hdfs转换为Hfile的一个Demopackage com.taobao.bulkload.jobimport java.io.IOExceptionimpo

Bulkload 将Hdfs转换为Hfile的一个Demo
package com.taobao.bulkload.job;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;


public class HFileGenerator {


public static class HBaseHFileMapper extends
Mapper<BytesWritable, Text, ImmutableBytesWritable, KeyValue> {


private ImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable();

private static final String DELIMITER_STR = "\001";


protected void map(BytesWritable key, Text value, Context context) {
try {
String valueStr = value.toString();
String[] strs = valueStr.split(DELIMITER_STR);
String auctionIdStr = strs[0];
String rowKey = new StringBuffer(auctionIdStr).reverse().toString();
immutableBytesWritable.set(Bytes.toBytes(rowKey));
KeyValue minPrice = createKeyValue(rowKey,strs[7],"D");
context.write(immutableBytesWritable, minPrice);
KeyValue avgPrice = createKeyValue(rowKey,strs[8],"E");
context.write(immutableBytesWritable, avgPrice);
KeyValue minReservePrice = createKeyValue(rowKey,strs[4],"C");
context.write(immutableBytesWritable, minReservePrice);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}


private KeyValue createKeyValue(String row,String str,String qualifier) {
String family = "F";
return new KeyValue(Bytes.toBytes(row), Bytes.toBytes(family),
Bytes.toBytes(qualifier), System.currentTimeMillis(),
Bytes.toBytes(str));
}


public static void main(String[] args) throws IOException,
InterruptedException, ClassNotFoundException {

Configuration conf = HBaseConfiguration.create();


Job job = new Job(conf, "testhbasehfile");
conf.addResource("hbase-site.xml");


TableMapReduceUtil.addDependencyJars(conf, HBaseConfiguration.class);


job.setJarByClass(HFileGenerator.class);
job.setMapperClass(HBaseHFileMapper.class);
job.setReducerClass(KeyValueSortReducer.class);


job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);


job.setInputFormatClass(SequenceFileInputFormat.class);
                      
job.setOutputFormatClass(HFileOutputFormat.class);

HTable table = new HTable(conf,"${your table}");

HFileOutputFormat.configureIncrementalLoad(job, table);

FileInputFormat.addInputPath(job, new Path("${path}"+preDateStr()));
HFileOutputFormat.setOutputPath(job, new Path("${path}"+preDateStr()));


System.exit(job.waitForCompletion(true) ? 0 : 1);
}

public static String preDateStr(){

Calendar yesterday = Calendar.getInstance();
yesterday.add(Calendar.DAY_OF_MONTH, -1);
yesterday.set(Calendar.HOUR_OF_DAY, 0);
yesterday.set(Calendar.MINUTE, 0);
yesterday.set(Calendar.SECOND, 0);

DateFormat df=new SimpleDateFormat("yyyyMMdd");
return df.format(yesterday.getTime());
}
}
}

热点排行