首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

地图reudce 通过读取hbase表删除hbase 数据

2013-12-17 
mapreudce 通过读取hbase表删除hbase 数据package foo.bar.MRimport java.io.IOExceptionimport java.ut

mapreudce 通过读取hbase表删除hbase 数据

package foo.bar.MR;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import foo.bar.validate.Const;public class DropRowByTimeStampMapReduce  {   public static Configuration configuration;   public static List<String> rowkeyList = new ArrayList<String>();   public static List<String> qualifierList = new ArrayList<String>();   static {          configuration = HBaseConfiguration.create();         configuration.set("hbase.zookeeper.quorum",Const.ZOOKEEPER_QUORAM);          configuration.set("hbase.rootdir", Const.HBASE_ROOTDIR);      }      static class MyMapper extends TableMapper<Text, LongWritable> {public void map(ImmutableBytesWritable row, Result r, Context context)throws InterruptedException, IOException {String tableName = context.getConfiguration().get("tableName");HTable htbl = new HTable(configuration, tableName);List<Delete> lists = new ArrayList<Delete>();for (KeyValue kv : r.raw()) {Delete dlt = new Delete(kv.getRow());dlt.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());lists.add(dlt);System.out.println("delete-- gv:"+Bytes.toString(kv.getRow())+",family:"+Bytes.toString(kv.getFamily())+",qualifier:"+Bytes.toString(kv.getQualifier())+",timestamp:"+kv.getTimestamp());}htbl.delete(lists);htbl.flushCommits();htbl.close();}}          public static void main(String[] args) throws Exception {      if(args.length!=2){    return ;    }    String tableName = args[0];    String timeStamp = args[1];    Configuration config = HBaseConfiguration.create();    config.set("tableName", tableName);    Job job = new Job(config, "ExampleRead");    job.setJarByClass(DropRowByTimeStamp.class);     // class that contains mapper        Scan scan = new Scan();    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs    scan.setCacheBlocks(false);  // don't set to true for MR jobs        scan.setTimeStamp(new Long(timeStamp));          TableMapReduceUtil.initTableMapperJob(      tableName,        // input HBase table name      scan,             // Scan instance to control CF and attribute selection      MyMapper.class,   // mapper      null,             // mapper output key       null,             // mapper output value      job);    job.setOutputFormatClass(NullOutputFormat.class);   // because we aren't emitting anything from mapper            boolean b = job.waitForCompletion(true);    if (!b) {      throw new IOException("error with job!");    }    }         }

?

热点排行