mapreudce 通过读取hbase表删除hbase 数据
package foo.bar.MR;import java.io.IOException;import java.util.ArrayList;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.KeyValue;import org.apache.hadoop.hbase.client.Delete;import org.apache.hadoop.hbase.client.HTable;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapreduce.TableMapper;import org.apache.hadoop.hbase.util.Bytes;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;import foo.bar.validate.Const;public class DropRowByTimeStampMapReduce { public static Configuration configuration; public static List<String> rowkeyList = new ArrayList<String>(); public static List<String> qualifierList = new ArrayList<String>(); static { configuration = HBaseConfiguration.create(); configuration.set("hbase.zookeeper.quorum",Const.ZOOKEEPER_QUORAM); configuration.set("hbase.rootdir", Const.HBASE_ROOTDIR); } static class MyMapper extends TableMapper<Text, LongWritable> {public void map(ImmutableBytesWritable row, Result r, Context context)throws InterruptedException, IOException {String tableName = context.getConfiguration().get("tableName");HTable htbl = new HTable(configuration, tableName);List<Delete> lists = new ArrayList<Delete>();for (KeyValue kv : r.raw()) {Delete dlt = new Delete(kv.getRow());dlt.deleteColumn(kv.getFamily(), kv.getQualifier(), kv.getTimestamp());lists.add(dlt);System.out.println("delete-- gv:"+Bytes.toString(kv.getRow())+",family:"+Bytes.toString(kv.getFamily())+",qualifier:"+Bytes.toString(kv.getQualifier())+",timestamp:"+kv.getTimestamp());}htbl.delete(lists);htbl.flushCommits();htbl.close();}} public static void main(String[] args) throws Exception { if(args.length!=2){ return ; } String tableName = args[0]; String timeStamp = args[1]; Configuration config = HBaseConfiguration.create(); config.set("tableName", tableName); Job job = new Job(config, "ExampleRead"); job.setJarByClass(DropRowByTimeStamp.class); // class that contains mapper Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs scan.setTimeStamp(new Long(timeStamp)); TableMapReduceUtil.initTableMapperJob( tableName, // input HBase table name scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper null, // mapper output key null, // mapper output value job); job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }
?