首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > 云计算 >

Mapreduce范例-分组排重(group by distinct)

2013-09-07 
Mapreduce实例-分组排重(group by distinct)需要实现以下几个类,代码太多,列了下主要代码,可根据排重数据

Mapreduce实例-分组排重(group by distinct)

需要实现以下几个类,代码太多,列了下主要代码,可根据排重数据的特征判读是否需要添加combiner来提速。

 

public class GroupComparator implements RawComparator<MyBinaryKey> {  @Override public int compare(MyBinaryKey o1, MyBinaryKey o2) {  return o1.toString().compareTo(o2.toString()); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  return WritableComparator.compareBytes(b1, s1, Long.SIZE / 8 + Integer.SIZE / 8 * 3, b2, s2,  Long.SIZE / 8 + Integer.SIZE / 8 * 3); }}public abstract class UVBinaryKey  extends BinaryComparable implements WritableComparable<BinaryComparable>{ //根据需要添加属性;  @Override public void readFields(DataInput in) throws IOException {} @Override public byte[] getBytes() {}}public class MyPartitioner extends Partitioner<MyBinaryKey, NullWritable> { /**  * 根据uv/ip取模分区,保证相同uv/ip落在同一分区  */ @Override public int getPartition(MyBinaryKey key, NullWritable value, int numPartitions) {    int k=0;  for(byte b : key.getAttr()){   k+=b&0xff;  }  return k%numPartitions; }}  job.setMapOutputKeyClass(UVBinaryKey.class);  job.setGroupingComparatorClass(GroupComparator.class);   job.setPartitionerClass(MyPartitioner.class);map略
combiner(根据需要添加)reduce中的实现:       @Override        protected void reduce(UVBinaryKey key, Iterable<NullWritable> values, Context context)                throws IOException,                InterruptedException {            long count = 0;            byte[] tbsign = null;            for (NullWritable nullWritable : values) {                byte[] attr = key.getAttr();                if (tbsign == null) {                    tbsign = attr;                    count++;                }                if (tbsign != null) {                    if (tbsign.length != attr.length) {                        count++;                        tbsign = attr;                    } else {                        for (int i = 0; i < tbsign.length; i++) {                            if (tbsign[i] != attr[i]) {                                count++;                                tbsign = attr;                                break;                            }                        }                    }                }            }            StringBuffer out = new StringBuffer();            out.append(new String(key.getCity()))                    .append(Constants.FIELDS_TERMINATED).append(count);            context.write(new Text(out.toString()), NullWritable.get());        }


 

 

 

 

热点排行