首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 其他相关 >

Hadoop学习小结之三:Map-Reduce入门

2012-11-07 
Hadoop学习总结之三:Map-Reduce入门?2、编写Map-Reduce程序编写Map-Reduce程序,一般需要实现两个函数:mappe

Hadoop学习总结之三:Map-Reduce入门

?

2、编写Map-Reduce程序

编写Map-Reduce程序,一般需要实现两个函数:mapper中的map函数和reducer中的reduce函数。

一般遵循以下格式:

map: (K1, V1)? ->? list(K2, V2)

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

? void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)

? throws IOException;

}

reduce: (K2, list(V))? ->? list(K3, V3)?

public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

? void reduce(K2 key, Iterator<V2> values,

????????????? OutputCollector<K3, V3> output, Reporter reporter)

??? throws IOException;

}

?

对于上面的例子,则实现的mapper如下:

?

public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

??? @Override

??? public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

??????? String line = value.toString();

??????? String year = line.substring(15, 19);

??????? int airTemperature;

??????? if (line.charAt(25) == '+') {

??????????? airTemperature = Integer.parseInt(line.substring(26, 30));

??????? } else {

??????????? airTemperature = Integer.parseInt(line.substring(25, 30));

??????? }

??????? output.collect(new Text(year), new IntWritable(airTemperature));

??? }

}

实现的reducer如下:

public class MaxTemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

??? public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

??????? int maxValue = Integer.MIN_VALUE;

??????? while (values.hasNext()) {

??????????? maxValue = Math.max(maxValue, values.next().get());

??????? }

??????? output.collect(key, new IntWritable(maxValue));

??? }

}

?

欲运行上面实现的Mapper和Reduce,则需要生成一个Map-Reduce得任务(Job),其基本包括以下三部分:

输入的数据,也即需要处理的数据 Map-Reduce程序,也即上面实现的Mapper和Reducer 此任务的配置项JobConf

欲配置JobConf,需要大致了解Hadoop运行job的基本原理:

Hadoop将Job分成task进行处理,共两种task:map task和reduce task Hadoop有两类的节点控制job的运行:JobTracker和TaskTracker JobTracker协调整个job的运行,将task分配到不同的TaskTracker上 TaskTracker负责运行task,并将结果返回给JobTracker Hadoop将输入数据分成固定大小的块,我们称之input split Hadoop为每一个input split创建一个task,在此task中依次处理此split中的一个个记录(record) Hadoop会尽量让输入数据块所在的DataNode和task所执行的DataNode(每个DataNode上都有一个TaskTracker)为同一个,可以提高运行效率,所以input split的大小也一般是HDFS的block的大小。 Reduce task的输入一般为Map Task的输出,Reduce Task的输出为整个job的输出,保存在HDFS上。 在reduce中,相同key的所有的记录一定会到同一个TaskTracker上面运行,然而不同的key可以在不同的TaskTracker上面运行,我们称之为partition partition的规则为:(K2, V2) –> Integer, 也即根据K2,生成一个partition的id,具有相同id的K2则进入同一个partition,被同一个TaskTracker上被同一个Reducer进行处理。

public interface Partitioner<K2, V2> extends JobConfigurable {

? int getPartition(K2 key, V2 value, int numPartitions);

}

下图大概描述了Map-Reduce的Job运行的基本原理:

?

Hadoop学习小结之三:Map-Reduce入门

?

?

下面我们讨论JobConf,其有很多的项可以进行配置:

setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数 setMapperClass:设置Mapper,默认为IdentityMapper setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数 setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式 setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式 setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数 setReducerClass:设置Reducer,默认为IdentityReducer setOutputFormat:设置任务的输出格式,默认为TextOutputFormat FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径 FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:

public class MaxTemperature {

??? public static void main(String[] args) throws IOException {

??????? if (args.length != 2) {

??????????? System.err.println("Usage: MaxTemperature <input path> <output path>");

??????????? System.exit(-1);

??????? }

??????? JobConf conf = new JobConf(MaxTemperature.class);

??????? conf.setJobName("Max temperature");

??????? FileInputFormat.addInputPath(conf, new Path(args[0]));

??????? FileOutputFormat.setOutputPath(conf, new Path(args[1]));

??????? conf.setMapperClass(MaxTemperatureMapper.class);

??????? conf.setReducerClass(MaxTemperatureReducer.class);

??????? conf.setOutputKeyClass(Text.class);

??????? conf.setOutputValueClass(IntWritable.class);

??????? JobClient.runJob(conf);

??? }

}

3、Map-Reduce数据流(data flow)

Map-Reduce的处理过程主要涉及以下四个部分:

客户端Client:用于提交Map-reduce任务job JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件

Hadoop学习小结之三:Map-Reduce入门

?

3.1、任务提交

JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。

向JobTracker请求一个新的job ID 检测此job的output配置 计算此job的input splits 将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits 通知JobTracker此Job已经可以运行了

提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。

?

3.2、任务初始化

?

当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。

初始化首先创建一个对象来封装job运行的tasks, status以及progress。

在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。

其为每个input split创建一个map task。

每个task被分配一个ID。

?

3.3、任务分配

?

TaskTracker周期性的向JobTracker发送heartbeat。

在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。

在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。

TaskTracker有固定数量的位置来运行map task或者reduce task。

默认的调度器对待map task优先于reduce task

当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。

?

3.4、任务执行

?

TaskTracker被分配了一个task,下面便要运行此task。

首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。

TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。

其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。

其三,其创建一个TaskRunner来运行task。

TaskRunner创建一个新的JVM来运行task。

被创建的child JVM和TaskTracker通信来报告运行进度。

?3.4.1、Map的过程

MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。

map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。

当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。

在写入硬盘之前,内存中的数据通过partitioner分成多个partition。

在同一个partition中,背景线程会将数据按照key在内存中排序。

每次从内存向硬盘flush数据,都生成一个新的spill文件。

当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。

reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。

3.4.2、Reduce的过程

当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。

对于一个job,JobTracker知道TaskTracer和map输出的对应关系。

reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。

reduce task需要其对应的partition的所有的map输出。

reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。

reduce task中有多个copy线程,可以并行拷贝map输出。

当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。

当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。

最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

?

Hadoop学习小结之三:Map-Reduce入门

?

3.5、任务结束

?

当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。

当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。

1 楼 Lewiss 2011-06-17   个人觉得这是精华总结,搂着强悍!!!

热点排行