hadoop 排序汇总
? ? 前段时间写过一个利用python收集客户端的日志,并启动hadoop进行离线分析,最后用php展示的log日志分析系统,今天开始重新整理一下,python和php在这里就不提起了,就讲一下hadoop的使用。
? ? 业务需求(部分):要求在一天的日志里,把最热门的url按照访问次数和流量大小排序分别统计出来(每个客户都有自己的域名,可以有多个)。
? ? log的内容格式:domain(域名) url(请求url) size(请求大小),每个字段以空格隔开。
? ? 实现的部分代码如下:
? ? 我这边做2次mapreduce,第一次,先把访问次数和流量大小合计起来,第二次再给它排序。
? ? 第一遍的map实现伪代码:
? ? ? ? ?// 将域名和请求url作为key
?key.append(域名).append(分割符).append(请求url); ?
? ? ? ? ?// 将访问次数1和流量做为value
? ? ? ? ?val.append("1").append(分割符).append(大小);
? ? ? 第一遍reduce实现伪代码
? ? ? ? ? String[] datas = value.toString().split(分割符);
? ? ? ? ? sumTimes += Long.valueOf(datas[0]); // 访问次数
? ? ? ? ? sumSize += Long.valueOf(datas[1]); ? ?// 流量?
?
? ? ?经过这次的mapreduce之后,它的输出是按照域名和请求url来排序的,要想让它按照流量或者访问次数排序,则必须经过第2次mapreduce,我们要自定义排序规则。在做第2次mapreduce之前,我们得先做下本地reduce(我们这里是求合计可以先做本地reduce,如果是求平均值就要另想办法),这样可以较大幅度的提高性能(因为是经过合计之后的数据才发到别的节点上去,这样可以较大的减少数据传输和IO次数)。
? ? ? 本地reduce其实就是一个reduce的一个实现类,实现原理跟第一次的reduce代码几乎一样,只是要注意一点,本地reduce的输入就是第一次reduce的输出,本地reduce的输出就是第2次map的输入。
? ? ? 第2遍map实现和reduce其实都很简单,关键是一个自定义的排序类,如下:
? ? ? ? ??public class Sort implements WritableComparable<Sort> {
private String domain = "";private long size = 0; /** * Set the left and right values. */public void set(String domain, String size) {this.domain = domain;if(size == null || size.trim().equals("")) size = "0";this.size = Long.parseLong(size);}public String getDomain() {return domain;}public long getSize() {return size;}@Overridepublic int hashCode() {return this.domain.hashCode() * 157 + (int) this.size;}@Overridepublic boolean equals(Object right) {if (right instanceof Sort) {Sort r = (Sort) right;return r.domain.equals(domain) && r.size == size;} else {return false;}}@Overridepublic int compareTo(Sort o) {if (!this.domain.equals(o.domain)) {return domain.compareTo(o.domain) < 0 ? -1 : 1;} else if (size != o.size) {return size < o.size ? 1 : -1;} else {return 0;}}@Overridepublic void readFields(DataInput in) throws IOException {this.domain = Text.readString(in);this.size = in.readLong() + Long.MIN_VALUE;}@Overridepublic void write(DataOutput out) throws IOException {Text.writeString(out, this.domain);out.writeLong(this.size - Long.MIN_VALUE);}}
? ?第2遍map实现类关键点:
? ? ? ?public class MapSize extends MapReduceBase implements Mapper<LongWritable, Text, Sort, Text> {
private Sort sizeSort = new Sort(); public void map(LongWritable key, Text value, OutputCollector<Sort, Text> output, Reporter reporter) throws IOException { ...// 流量sizeSort.set(key, 流量或者大小); //我这里省略写法,其实是第一遍分析之后产生2个文件输出分别为流量和访问次数output.collect(sizeSort, value); }}
? ? 第2遍reduce实现关键点:
? ? ?public class Reduce extends MapReduceBase implements
Reducer<Sort, Text, NullWritable, Text> {private NullWritable n = NullWritable.get();public void reduce(Sort key, Iterator<Text> values,OutputCollector<NullWritable, Text> output, Reporter reporter)throws IOException {}}
?
经过2次mapreduce之后如果setNumReduceTasks设置的个数为1,那这个结果就是我们想要的了,但是hadoop不可能只有一个节点在跑reduce,这个数通常要大一些,所以我们还得设置分区(其实第一遍分析的时候就要设置分区了)。
public class MyPartitioner implements Partitioner<Text, Writable> {@Overridepublic int getPartition(Text key, Writable value, int numPartitions) { // 按照流量大小分区自定义分区规则,比方说一共有10台机器, //流量的范围在0-1000000之间 // 那么可以这样划分每100000一个分区 return Math.abs( 流量大小 / (numPartitions * 100000)+ 1;}@Overridepublic void configure(JobConf arg0) {}}
?
??最后文件的输入名字自己定义一下(每个reduce一个输出文件),按照自己的规则把那些文件合并在一起就得到全局排序的结果了。排版有点乱,调不好,请见谅。