首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

hadoop 排序集锦

2012-08-28 
hadoop 排序汇总? ? 前段时间写过一个利用python收集客户端的日志,并启动hadoop进行离线分析,最后用php展

hadoop 排序汇总

? ? 前段时间写过一个利用python收集客户端的日志,并启动hadoop进行离线分析,最后用php展示的log日志分析系统,今天开始重新整理一下,python和php在这里就不提起了,就讲一下hadoop的使用。

? ? 业务需求(部分):要求在一天的日志里,把最热门的url按照访问次数和流量大小排序分别统计出来(每个客户都有自己的域名,可以有多个)。

? ? log的内容格式:domain(域名) url(请求url) size(请求大小),每个字段以空格隔开。


? ? 实现的部分代码如下:

? ? 我这边做2次mapreduce,第一次,先把访问次数和流量大小合计起来,第二次再给它排序。

? ? 第一遍的map实现伪代码:


? ? ? ? ?// 将域名和请求url作为key

?key.append(域名).append(分割符).append(请求url); ?


? ? ? ? ?// 将访问次数1和流量做为value

? ? ? ? ?val.append("1").append(分割符).append(大小);


? ? ? 第一遍reduce实现伪代码

? ? ? ? ? String[] datas = value.toString().split(分割符);

? ? ? ? ? sumTimes += Long.valueOf(datas[0]); // 访问次数

? ? ? ? ? sumSize += Long.valueOf(datas[1]); ? ?// 流量?

?

? ? ?经过这次的mapreduce之后,它的输出是按照域名和请求url来排序的,要想让它按照流量或者访问次数排序,则必须经过第2次mapreduce,我们要自定义排序规则。在做第2次mapreduce之前,我们得先做下本地reduce(我们这里是求合计可以先做本地reduce,如果是求平均值就要另想办法),这样可以较大幅度的提高性能(因为是经过合计之后的数据才发到别的节点上去,这样可以较大的减少数据传输和IO次数)。

? ? ? 本地reduce其实就是一个reduce的一个实现类,实现原理跟第一次的reduce代码几乎一样,只是要注意一点,本地reduce的输入就是第一次reduce的输出,本地reduce的输出就是第2次map的输入。

? ? ? 第2遍map实现和reduce其实都很简单,关键是一个自定义的排序类,如下:

? ? ? ? ??public class Sort implements WritableComparable<Sort> {

private String domain = "";private long size = 0;    /** * Set the left and right values. */public void set(String domain, String size) {this.domain = domain;if(size == null || size.trim().equals("")) size = "0";this.size = Long.parseLong(size);}public String getDomain() {return domain;}public long getSize() {return size;}@Overridepublic int hashCode() {return this.domain.hashCode() * 157 + (int) this.size;}@Overridepublic boolean equals(Object right) {if (right instanceof Sort) {Sort r = (Sort) right;return r.domain.equals(domain) && r.size == size;} else {return false;}}@Overridepublic int compareTo(Sort o) {if (!this.domain.equals(o.domain)) {return domain.compareTo(o.domain) < 0 ? -1 : 1;} else if (size != o.size) {return size < o.size ? 1 : -1;} else {return 0;}}@Overridepublic void readFields(DataInput in) throws IOException {this.domain = Text.readString(in);this.size = in.readLong() + Long.MIN_VALUE;}@Overridepublic void write(DataOutput out) throws IOException {Text.writeString(out, this.domain);out.writeLong(this.size - Long.MIN_VALUE);}}

? ?第2遍map实现类关键点:

? ? ? ?public class MapSize extends MapReduceBase implements Mapper<LongWritable, Text, Sort, Text> {

    private Sort sizeSort = new Sort();        public void map(LongWritable key, Text value, OutputCollector<Sort, Text> output, Reporter reporter) throws IOException {  ...// 流量sizeSort.set(key, 流量或者大小); //我这里省略写法,其实是第一遍分析之后产生2个文件输出分别为流量和访问次数output.collect(sizeSort, value);      }}

? ? 第2遍reduce实现关键点:

? ? ?public class Reduce extends MapReduceBase implements

Reducer<Sort, Text, NullWritable, Text> {private NullWritable n = NullWritable.get();public void reduce(Sort key, Iterator<Text> values,OutputCollector<NullWritable, Text> output, Reporter reporter)throws IOException {}}

?

经过2次mapreduce之后如果setNumReduceTasks设置的个数为1,那这个结果就是我们想要的了,但是hadoop不可能只有一个节点在跑reduce,这个数通常要大一些,所以我们还得设置分区(其实第一遍分析的时候就要设置分区了)。

public class MyPartitioner implements Partitioner<Text, Writable> {@Overridepublic int getPartition(Text key, Writable value, int numPartitions) {                 // 按照流量大小分区自定义分区规则,比方说一共有10台机器,                 //流量的范围在0-1000000之间                 // 那么可以这样划分每100000一个分区                 return Math.abs( 流量大小 / (numPartitions * 100000)+ 1;}@Overridepublic void configure(JobConf arg0) {}}

?

??最后文件的输入名字自己定义一下(每个reduce一个输出文件),按照自己的规则把那些文件合并在一起就得到全局排序的结果了。排版有点乱,调不好,请见谅。

热点排行