首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 互联网 >

Mapreduce构建Hbase目录

2013-09-26 
Mapreduce构建Hbase索引package testimport java.io.IOExceptionimport java.util.HashMapimport org.a

Mapreduce构建Hbase索引

package test;import java.io.IOException;import java.util.HashMap;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.GenericOptionsParser;public class IndexBuilder {// 索引表唯一的一列为INDEX:ROW,其中INDEX为列族public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");public static class MapextendsMapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {private byte[] family;// 存储了“列名”到“表名-列名”的映射// 前者用于获取某列的值,并作为索引表的键值,后者用于作为表的表名private HashMap<byte[], ImmutableBytesWritable> indexes;@Overrideprotected void map(ImmutableBytesWritable rowKey, Result result,Context context) throws IOException, InterruptedException {for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {byte[] qualifier = index.getKey();// 获得列名ImmutableBytesWritable tableName = index.getValue();// 索引表的表名byte[] value = result.getValue(family, qualifier);// 根据“列族:列名”获得元素的值if (value != null) {// 以列值作为行键,在列“INDEX:ROW”中插入行键Put put = new Put(value);put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());// 在tableName表上执行put操作// 使用MultiOutputFormat时,第二个参数必须是Put或者Delete类型context.write(tableName, put);}}}/** * setup为Mapper中的方法,该方法只在任务初始化时执行一次 */@Overrideprotected void setup(Context context) throws IOException,InterruptedException {Configuration configuration = context.getConfiguration();String tableName = configuration.get("index.tablename");String[] fields = configuration.getStrings("index.fields");// fields内为需要做索引的列名String familyName = configuration.get("index.familyname");family = Bytes.toBytes(familyName);// 初始化indexes方法indexes = new HashMap<byte[], ImmutableBytesWritable>();for (String field : fields) {// 如果给name做索引,则索引表的名称为“heroes-name”indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(Bytes.toBytes(tableName + "-" + field)));}}}public static Job configureJob(Configuration conf, String[] args)throws IOException {String tableName = args[0];String columnFamily = args[1];System.out.println("****" + tableName);// 通过Configuration.set()方法传递参数conf.set(TableInputFormat.SCAN,TableMapReduceUtil.convertScanToString(new Scan()));conf.set(TableInputFormat.INPUT_TABLE, tableName);conf.set("index.tablename", tableName);conf.set("index.familyname", columnFamily);String[] fields = new String[args.length - 2];for (int i = 0; i < fields.length; i++) {fields[i] = args[i + 2];}conf.setStrings("index.fields", fields);conf.set("index.familyname", "attributes");// 配置任务的运行参数Job job = new Job(conf, tableName);job.setJarByClass(IndexBuilder.class);job.setMapperClass(Map.class);job.setNumReduceTasks(0);job.setInputFormatClass(TableInputFormat.class);job.setOutputFormatClass(MultiTableOutputFormat.class);return job;}public static void main(String[] args) throws Exception {Configuration conf = HBaseConfiguration.create();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 3) {System.err.println("Only " + otherArgs.length+ " arguments supplied, required: 3");System.err.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");System.exit(-1);}Job job = configureJob(conf, otherArgs);System.exit(job.waitForCompletion(true) ? 0 : 1);}}// 运行:// 如果要对heroes中的name和email列构建索引,则运行参数设为heroes info name email

?

热点排行