《权威指南》笔记七 MapReduce的类型和格式
作者:JesseZhang (CNZQS|JesseZhang)
博客地址:http://www.cnzqs.com
要点:
1、MapReduce类型
1.1默认的MapReduce Job
2、输入格式
2.1输入分片与记录
2.2文本输入
2.3二进制输入
2.4多种输入
2.5数据库输入(和输出)
3、输出格式
3.1文本输出
3.2二进制输出
3.3多个输出
3.4延迟输出
3.5数据库输出
?
============================================
1、 MapReduce类型
Map和Reduce函数的常规格式:
map:(K1,V1) ——> list(K2,V2)
reduce:(K2,list(V2)) ——> list(K3,V3)
combine:(K2,list(V2)) ——> list(K2,V2)
partition:(K2,V2) ——> integer
?
类型分类:可以设置的属性和必须与类型相容的属性。其中可设置的属性,需要在程序中显式指定。
?
类型冲突是在作业执行过程中被检测出来的,所以比较明智的做法是先用少量的数据跑一次测试任务,发现并修正任何类型不兼容的问题。
1.1默认的MapReduce Job
?
0.20.2 后,相比之前,改变了如下内容:
1、Job替换JobConf,通过Configuration来代入。
?
可以在Job中设置MapperClass和ReduceClass,以及分区、任务数等属性。
?
选择reducer的个数:
1、????????????? reducer最优个数与集群中可用的reducer任务槽数相关。
2、????????????? 总槽数=集群中节点数*每个节点的任务槽数。
3、????????????? 总槽数可以在mapred.tasktracker.reduce.tasks.maximun属性设置
常用方法:
?
默认的Streaming作业
Hadoop streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的map/reduce作业, 这些特殊的map/reduce作业是由一些可执行文件或脚本文件充当mapper或者reducer。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/wc
注意:必须提供一个mapper。
可以通过一个可配置的分隔符来进行分割,也可以由多个字段组合(Maper和Reduce的分隔符配置是相对独立的,可以分别设置)。可以由一条记录的前n个字段组成(由stream.num.map.output.key.fields 或 stream.num.reduce.output.key.Fields进行定义),剩下的字段就是值。
?
2、 输入格式
2.1输入分片与记录
层次关系是:输入分片(split)与map对应,是每个map处理的唯一单位。每个分片包括多条记录,每个记录都有对应键值对。
输入切片的接口:InputSplit接口(在org.interface InputSplit extends Writable){
long getLength() throws IOException;
String[] getLocations() throws IOException;
}
?
一个分片并不包含数据本身,而是指向数据的引用。存储位置供MapReduce系统使用以便将map任务尽量放在分片数据附近,而长度用来排序分片,以便优化处理最大的分片,从而最小化作业运行时间。
InputSplit不需要开发人员直接处理,由InputFormat创建。
?
FileInputFormat类:静态方法:addInputPath addInputPaths
路径可以表示一个文件、一个目录或是一个glob,即一个文件和目录的集合。
表示目录的路径包含这个目录下的所有的文件(是否包括子目录?答案:不包括,如果包含子目录,会被当做文件,出现错误)
???????? 过滤文件的方法:setInputPathFilter()
???????? 不设置过滤器,则默认排除隐藏文件(名称中已”.”和“_”开头的文件)
FileInputFormat类的输入分片:
FileInputFormat只分割大文件(超过HDFS块大小)。分片通常与HDFS块大小一样,但可以通过属性设置:mapred.min.split.size? 和 mapred.max.split.size 和 dfs.block.size
最小分片大小,通常是1个字节。
分片大小的计算公式:
Max(minimumSize,min(maximumSize,blockSize))
minimumSize < blockSize < maximumSize
?
通过CombineFileInputFormat来合并小文件,环节小文件过多影响执行速度的问题。
应该尽量避免小文件,因为MapReduce处理数据的最佳速度最好与数据在集群中的传输速度相同,小文件会增加寻址次数、浪费NameNode的内存。
?
使用CombineFileInputFormat的时候需要自己实现getRecordReader()方法。
?
避免切分
1、? 把最小分片大小设置为最大
2、? 使用FileInputFormat具体子类,并且重载isSplitable()方法,将返回值设置为false
?
继承CombineFileInputFormat而不是FileInputFormat。
?
???????? 2.2文本输入
TextInputFormat:
键:LongWritable类型,存储该行在整个文件中的字节偏移量
值:这行的内容,不包括任何终止符(换行符和回车符)
?
输入分片和HDFS块之间可能不能很好的匹配,出现跨块的情况。
?
KeyValueTextInputFormat : 分界符 :key.value.separator.in.input.line
?
NLineInputFormat:与TextInputFormat一样,键是文件中行的字节偏移量,值是行本身。主要是希望mapper收到固定行数的输入。
XML:
1、? 大多数XML文件不大,可以通过“把一个文件当成一条记录处理”的方式解决。
2、? StreamXmlRecordReader
?
???????? 2.3二进制输入
SequenceFileInputFormat
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat
?
2.4多种输入
1、MultipleInputs类处理多种格式的输入,允许为每个输入路径指定InputFormat和Mapper。
2、两个mapper的输出类型是一样的,所以reducer看到的是聚集后的map输出,并不知道输入是不同的mapper产生的。
3、重载版本:addInputPath(),没有mapper参数,主要支持多种输入格式只有一个mapper。
?
?
2.5数据库输入(和输出)???????
DBInputFormat/DBOutPutFormat:
1、? 使用JDBC
2、? 适合加载小量的数据集,否则对数据库负担较重
?
在关系数据库和HDFS中移动数据的另一个办法是:使用Sqoop
?
3、 输出格式
???????? 3.1文本输出
1、TextOutputFormat
2、默认分隔符制表符。可以设置:mapred.textoutputformat.separator
?
???????? 3.2二进制输出
SequenceFileOutputFormat:输出为顺序文件,结构紧凑,易压缩
SequenceFileAsBinaryOutputFormat
SequenceFileOutputFormat
?
???????? 3.3多个输出
默认输出文件名格式:part-00000
通过MultipleOutputFormat 和MultipleOutputs进行文件名控制和输出多个文件。
通过setOutputFormat指定。
?
MultipleOutputFormat:格式化输出
MultipleOutputs:可以为不同的输出产生不同的类型
?
3.4延迟输出
3.5数据库输出