MapReduce Design Patterns(chapter 1)(一)
翻译的是这本书:
MapReduce 是一种运行于成百上千台机器上的处理数据的框架,目前被google,Hadoop等多家公司或社区广泛使用。这种计算框架是非常强大,但它没有提供一个处理所谓“big data”的通用,普遍的情形,所以它能很好的解决一些问题,在处理某些问题上也存在挑战。这本书教给你在什么问题上适合使用MapReduce和怎样高效的使用它。
初次接触时,很多人没有意识到MapReduce是一种计算框架而不仅仅是一个工具。你必须找到合适的map和reduce框架作为合适的解决方案,而这种map和reduce可能并不适用于其它情形。MapReduce算不上是特征,更确切的说是一种规则。
这使得问题解决起来更容易也更难了。它明确告诉你能做什么不能做什么,比起通常使用的方法,它提供的可选方式也少了。同时,理解怎样用规则解决问题需要灵活的头脑和思维的转变。
开始学习MapReduce非常像递归的学习:在找出问题的递归情形上存在挑战,当你一旦发现,问题也就变得清晰,简明,优雅。在很多情况下,你必须考虑到被MapReduce job,特别是内部集群网络使用的的系统资源的利用率。这里要权衡的是:指定的MapReduce框架是否有能力在分布式计算中处理数据,而不去考虑并发,鲁棒性,数据规模及其它的挑战。使用独特的系统,独特的解决问题的方式,便引入了设计模式。
什么是MapReduce的设计模式?它是一种使用MapReduce解决常规数据处理问题的模板。模式不会限定于特定的领域,比如:文本处理或图形分析。它只是一种处理问题的通用手段。
使用设计模式就是使用被久经考验并证明是好的设计原则来更好的构建你的软件。
由于一些原因,设计一款好的软件存在挑战性,想在MapReduce里实现一种好的设计也是相似的。就像好的程序猿能够把创作出不好的软件归因于糟糕的设计,好的程序猿也能创作出差的MapReduce算法。使用MapReduce时,我们不仅要保证代码的整洁和可维护性,还要保证分布于成百上千各节点上所要计算的TB甚至PB数量级数据的job的性能。除此之外,这个job还可能存在与共享集群上其他job存在竞争。这使得使用MapReduce时选择一种正确的设计变得极其重要,用的好的话,在性能上能获得几个数量级的提高。在我们深入讨论设计模式之前,先说说怎样和为什么设计模式和MapReduce一起使用会很有意义,还有需要具备的知识及从哪获得。
Design Patterns
众多的设计模式已经让开发者们悠哉悠哉了很多年。他们是通用,可复用的解决问题的工具,所以开发者可以在短时间内想出克服障碍的方法并进行下面的开发。它们也是有经验的开发者们用一种简洁的方式把他们的知识传递给下一代的手段。
在软件工程设计模式领域,一个重要的里程碑就是这本书:Design Patterns: Elements of Reusable Object-Oriented Software, by Gamma etal. (Addison-Wesley Professional, 1995)。众人所知的“Gang of Four”book。在这本非常流行的书里,没有一个模式是新的,并且已经使用了好多年。它仍然非常有影响力的原因是:作者花时间证明了最重要的设计模式在面向对象编程中普遍适用。自从这本书1994年发行至今,很多对设计感兴趣的人让这些模式口口相传,或出现在各种学术会议,期刊,互联网。
设计模式已经经得起时间考验并显示了正确的抽象度:不确定性,即有太多的的模式要记忆并且很难适用一个问题。不通用性,大量工作涌入一个模式去运行。这种抽象度也带来一个重要的好处:提供给开发者口头和书面交流的通用语言。简单的说“abstract factory”要比去解释抽象工厂是什么是什么要容易的多。并且,看到别人写的实现抽象工厂的代码时,也就大体知道要完成什么样的工作。
MapReduce设计模式在局部的问题和情况下也适用于这样的规则。它提供通用的框架用于解决数据计算问题,但不需要指定问题所在的领域。有经验的MapReduce开发者能把解决通用问题的知识传递给初学者。这点是极其重要的,因为MapReduce是一种新兴的技术并且被快速的被接受,每天都有很多程序猿加入社区。MapReduce设计模式也提供了一种团队合作解决MapReduce问题时的公用语言。设想一种情形,开发者说应该使用“reduce-side join”代替“map-side replicated join”比解释它们各自底层的实现机制更简明。
现在MapReduce世界的状态跟1994年前面向对象世界很相似。模式已经通过博客,站点比如StackOverflow,深入其它书籍中,深入全球先进的技术开发团队,组织中。写这本书的目的不是提供现在还没有人见过的开创性的MapReduce问题的处理方式,而是收集了各自领域有经验的开发者分享的模式。
注释:目前提供的设计模式,真实的MapReduce范例方面的经验在使用的时候仍然需要深入理解。当你用这本书或其他地方看到的模式解决一个新的问题的时候,密切注意”Applicability”部分。
对大多数部分,书中的模式是平台无关性的。MapReduce被Google以一种没有任何实际源代码的范例发布,不管作为独立系统(例如:Hadoop,Disco,Amazon Elastic MapReduce)还是查询语言系统(例如:MongoDB, Greenplum DB, Aster Data),已经被实现了好几次。为了让设计模式更倾向于通用,我们站在hadoop角度写这本书。很多模式可以应用于其他系统,比如MoongoDB,因为他们概念上的架构是一致的。但是,很对技术细节上由于实现的不同而不同。“Gang of Four“的设计模式使用c++写的,但开发者更倾向于希望用比较流行的语言比如ruby,python。书中的模式对各种系统是可用的,而不仅仅对hadoop本身。你可以用书中的例子代码作为自己开发的向导。
MapReduce History
为什么说写本MapReduce设计模式的书是个很好的想法呢?确切的一点,社区的强势和范例的普遍使用度已经达到了可以写一些综合的开发者可以共享的设计模式的程度。几年前,当hadoop还在摇篮里的时候,还没被人们理解它能够胜任什么。但是MapReduce被采纳的速度是值得注意的。它来自于2004年google一篇有趣的论文,并被迅速接受,在2012年成为业界分布式数据处理的标准。
MapReduce具体的起源是有争议的,但是我们最先想起的是2004年下载的那篇Jeffrey Dean and SanjayGhemawat写的论文:“MapReduce: Simplified Data Processing on LargeClusters”。这篇文章描述了google如何分割,处理,整合他们令人难以置信的大数据集。
随后,开源软件先驱Doug Cutting,开始做MapReduce的实现用于解决可扩展数据,去构建开源搜索引擎Nutch。随后yahoo也开始投入研究,Hadoop分割出来成为apache顶级项目。如今,很多个人和组织都在为hadoop项目做贡献。每一次新的发行版都有新的功能和性能上的提升。
几个其他开源项目都是在hadoop基础之上建设的,并且数量还在增长。许多比较流行的,比如:pig,hive,hbase,Mahout,zookeeper。Doug Cutting和其他hadoop专家已经多次提到,hadoop将会成为分布式应用程序运行的分布式操作系统的核心。在这本书里,我们将会用最少通用的规范解释hadoop ecosystem和java MapReduce的例子。在一些章节模式相似的部分,我们将会概述pig和hive sql的对比。
MapReduce and Hadoop Refresher
这部分的重点是快速复习hadoop中的MapReduce,因为书中的代码例子是基于hadoop的。初学者可以进一步参考的资源有Tom White’s Hadoop:The Definitive Guide或者Apache Hadoop website。这些书可以帮你搭建一个可以运行书中例子的完整的环境。
Hadoop MapReduce jobs可以切分成一系列运行于分布式集群中的map和reduce任务,每个任务只运行全部数据的一个指定的子集,以此达到整个集群的负载平衡。Map任务通常加载,解析,转换,过滤数据,每个reduce处理map输出的一个子集。Reduce任务会去map任务端copy中间数据来完成分组,聚合。用这样一种简明直接的范式,从简单的数值聚合到复杂的join操作和笛卡尔操作,解决这么广泛的问题真的令人难以置信。
mapReduce 的输入是hdfs上存储的一系列文件集。在hadoop中,这些文件被一种定义了如何分割一个文件成分片的input format来分割,一个分片是一个文件基于字节的可以被一个map任务加载的一个块。
每个map任务被分为以下阶段:record reader,mapper,combiner,partitioner。Map任务的输出叫中间数据,包括keys和values,发送到reduce端。Reduce任务分为以下阶段:shuffle,sort,reduce,output format。运行map任务的节点会尽量选择数据所在的节点。这种情况下,不会出现网络传输,在本地节点就可以完成计算。
Record reader
Record reader会把根据input fromat生成输入分片翻译成records。Record reader的目的是把数据解析成记录,而不是解析数据本身。它把数据以键值对的形式传递给mapper。通常情况下键是偏移量,值是这条记录的整个字节块。自定义record reader 超出本书的范围。我们假设你有了处理数据适合的record reader。
MapMap阶段,会对每个从record reader处理完的键值对执行用户代码,这些键值对又叫中间键值对。键和值的选择不是任意的,并且对MapReduce job的成功非常重要。键会用来分组,值是reducer端用来分析的数据。这本书会在设计模式方面提供大量的细节去解释键值对的选择。设计模式之间一个主要的区别是键值对的语义。
CombinerCombiner 是一个map阶段分组数据,可选的,局部reducer。它根据用户提供的方法在一个mapper范围内根据中间键去聚合值。例如:数的总和是各个部分数量的和,你可以先计算中间的数目,最后再把所有中间数目加起来。很多情况下,这样能减少数据的网络传输量。发送(hello world,1)三次很显然要比发送(hello world,3)需要更多的网络传输字节量。Combiners可以被广泛的模式替换。很多hadoop开发者忽视combiner,但能获得更好的性能。我们需要指出的是哪一种模式用combiner有好处,哪一种不能用combiner。Combiner不会保证总会执行,所以它是一个整体逻辑。
PartitionerPartitioner会获取从mapper(或combiner)来的键值对,并分割成分片,每个reducer一个分片。默认用哈希值,典型使用md5sum。然后partitioner根据reduce的个数执行取余运算:key.hashCode() % (number of reducers)。这样能随即均匀的根据key分发数据到reduce,但仍然要保证不同mapper的相同key要到同一个reduce。Partitioner也可以自定义,使用更高级的样式,例如排序。然而,更改partitioner很少用。Partitioner的每个map的数据会写到本地磁盘,并等待对应的reducer检测,拿走数据。
Shuffle and sortReduce任务开始于shuffle和sort阶段。这一阶段获取partitioner的输出文件,并下载到reduce运行的本地机器。这些分片数据会根据key合并,排序成一个大的数据文件。排序的目的是让相同的key相邻,方便在reduce阶段值得迭代处理。这一阶段不能自定义,由框架自动处理。需要做的只是key的选择和可以自定义个用于分组的比较器。
ReduceReduce 任务会把分组的数据作为输入并对每个key组执行reduce方法代码。方法会传递key和可以相关的所有值得迭代集合。很多的处理会在这个方法里执行,也就会有很多的模式。一旦reduce方法完成,会发送0或多个键值对到output format。跟map一样,不同的reduce依据不同的逻辑情形而不同。
Output formatOutput format会把reduce阶段的输出键值对根据record writer写到文件里。默认用tab分割键值对,用换行分割不同行。这里也可以自定义为更丰富的输出格式,最后,数据被写到hdfs,自定义 output format也超出本书范围。
Hadoop Example: Word Count
现在你已经清楚了整个mapreduce的过程,下面看一个简单的例子。单词计数程序是mapreduce界权威的例子。简单直接表明mapreduce处理的高效。很多人抱怨单词计数例子被过度使用,但愿书中后面的部分能弥补这一点。
在这个例子中,我们使用StackOverflow上用户提交的word count。拿出文本域的一部分处理,我们就可以看到每个单词出现的次数。其中一条记录是这样的:
<row Id="8189677" PostId="6881722" Text="Have you looked at Hadoop?"
CreationDate="2011-07-30T07:29:33.343" UserId="831878" />
Id是8189677,邮编6881722,用户编码831878。邮编和用户编码可以作为其它数据部分的外键。后面的join模式中会讲怎样做数据集的join。
下面一段代码是一段框架代码,使用它可以编写不同的mapreduce job并运行。这段代码是通用的,被誉为“boiler plate”。你会看到我们所讲的大部分模式都用这套框架。
这段代码来自hadoop中“word count”例子:
importjava.io.IOException;
importjava.util.StringTokenizer;
importjava.util.Map;
importjava.util.HashMap;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
Hadoop Example: Word Count | 7
importorg.apache.hadoop.util.GenericOptionsParser;
importorg.apache.commons.lang.StringEscapeUtils;
public classCommentWordCount{
public static classWordCountMapper
extendsMapper<Object,Text,Text,IntWritable> {
...
}
public static classIntSumReducer
extendsReducer<Text,IntWritable,Text,IntWritable> {
...
}
public staticvoidmain(String[]args)throwsException{
Configuration conf=newConfiguration();
String[]otherArgs=
newGenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2) {
System.err.println("Usage: CommentWordCount <in> <out>");
System.exit(2);
}
Job job=newJob(conf,"StackOverflow Comment Word Count");
job.setJarByClass(CommentWordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,newPath(otherArgs[0]));
FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
框架的目的是编排job。Main方法的前几行用于解析命令行参数。然后用计算类,输入输出路径设置job。大致如此。仅仅保证类名正确和输出key,value types匹配reduce(此处原文是map,本人认为有误)的输出类型。
你将看到不同模式之间在job.setCombinerClass上会有不同,在一些情况下,combiner由于reduce的特殊性而不能使用。还有的情况下,combiner类不同于reduce类。在“word count”中,combiner的使用简单,高效。
下面是解析,准备数据的mapper代码。先清除掉单引号,非字符用空格代替等文本清洗后,文本被分割成单词列表,中间key就是单词本身,value为1。这意味着每个单词都会看到一次。如果一行里某单词出现两次,我们也会在单词列表中看到这个键值对“单词:1“两次。最后,所有的数据都会加到每个单词的全局计数字段里去。
public static classWordCountMapper
extendsMapper<Object,Text,Text,IntWritable> {
private final staticIntWritable one=newIntWritable(1);
privateText word=newText();
publicvoidmap(Object key,Text value,Context context)
throwsIOException,InterruptedException{
// Parse the input string into a nice map
Map<String,String>parsed= MRDPUtils.transformXmlToMap(value.toString());
// Grab the "Text" field, since that is what we are counting over
String txt=parsed.get("Text");
// .get will return null if the key is not there
if(txt==null) {
// skip this record
return;
}
// Unescape the HTML because the data is escaped.
txt=StringEscapeUtils.unescapeHtml(txt.toLowerCase());
// Remove some annoying punctuation
txt=txt.replaceAll("'","");// remove single quotes (e.g., can't)
txt=txt.replaceAll("[^a-zA-Z]"," "); // replace the rest with a space
// Tokenize the string by splitting it up on whitespace into
// something we can iterate over,
// then send the tokens away
StringTokenizer itr=newStringTokenizer(txt);
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word,one);
}
}
}
第一个方法, MRDPUtils.transformXmlToMap,是一个解析数据时出于友好提示的打印帮助信息的方法。你会看到我们的例子经常用到。它主要获取一行xml格式的记录(有着透明的格式)并根据属性把值放入map中。
下面,要把注意力放到wordcountMapper类,代码有一点复杂。大多数工作都在这里做。第一个要注意的是父类的类型:
Mapper<Object,Text,Text,IntWritable>
它们分别是,输入键,输入值,输出键,输出值。我们不关心输入键,所以用Object。因为我们一行一行以文本格式读入数据,所以输入值为Text(hadoop中特殊的string类型)。输出键和值是Text和IntWritable是因为我们使用单词作为键,出现次数作为值。
Notice:mapper的输入key和value是由job的配置类FileInputFormat指定的。默认实现类是TextInputformat,字节偏移量作为键,是LongWritable类型,文本值作为值,是Text类型。使用不同的input format会是不同的键值类型。
直到代码底部StringTokenizer的使用,我们仅仅是清洗文本。我们反转义数据,是因为文本被存储成转义的方式以方便xml解析。下一步,我们删除杂散的标点符号以便统一认为“hadoop!””hadoop?””hadoop”是相同的单词。最后,给每个单词赋值1,表示出现次数。Mapreduce框架然后运行shuffle和sort,输出的键值对给reduce任务。
最后看下reducer代码,相对来说比较简单。Reduce方法,每次以相同key为一组来调用,在这里是以每个单词为一组。通过迭代数值类型的值得集合,来计算求和。最终的结果就是每个单词出现的次数。
public static classIntSumReducer
extendsReducer<Text,IntWritable,Text,IntWritable> {
privateIntWritable result=newIntWritable();
publicvoidreduce(Text key,Iterable<IntWritable>values,
Context context)throwsIOException,InterruptedException{
intsum=0;
for(IntWritable val:values) {
sum+=val.get();
}
result.set(sum);
context.write(key,result);
}
}
跟mapper一样,我们通过父类模板指定了输入输出类型。输入键值必须匹配map的输出键值。Reduce的输出键值必须匹配job配置的FileOutputFormat中的内容。在这里,我们使用默认的TextOutputFormat,能将任意两个writable类型对象作为输出。
Reduce方法跟map的签名不同,那就是,在所有值(同一key)上进行迭代而不是仅处理一个值。那是因为你可以迭代同一个key的所有值。Reduce中的key是非常重要的,相比于map中的key来说。
传递给context.Write的数据会写到输出文件。每个reduce一个文件,如果你想把它们合起来还必须进一步处理。
Pig and Hive
Hive和pig在hadoop生态系统中对MapReduce设计模式来讲不是很需要。然而,我们也要找机会解释为什么设计模式仍然很重要。
Pig和hive是更高级抽象的MapReduce。它们提供的接口里对map或reduce什么都不做,但是系统会解析高级语言成为mapreduce jobs。跟关系型数据库中解析sql为具体数据操作的行为的执行计划很相似,hive和pig翻译它们的语句为MapReduce操作。
正如在这本书里相同的部分我们将要看到的,pig和hiveql相比于原生java写的hadoop程序来说,显得更为整洁。例如,用java解释全局排序要写几页的代码,但用pig仅仅几行代码。
为什么在有可选择的hive和pig时,我们任然写java mapreduce呢?为什么本书的作者花了很多时间解释怎么用上百行代码实现一件事情,而相同的事情也可以用几行代码解决?这里有两个主要的原因。
第一,这样可以从更底层理解MapReduce的工作原理。如果开发者理解了pig是怎样运行reduce端join,他将会做出更明智的选择。不理解MapReduce原理而去使用pig和hive可以导致危险的行为。你可以从更高一层的接口受益并不代表你可以忽视某些细节。在大型MapReduce集群上更应遵守必要的规则。
第二,到目前为止,hive和pig还没有完全实现应有的功能和成熟。很明显还没达到他们的全部潜能。现在,他们不能把用java写的MapReduce完全实现。这将随着发行版本的不断发行而改变一些。比如说,在pig0.6版本,你的团队50%的分析能用pig,而在0.9版本中,会达到90%。伴随着每一版的发行,随之而来的也有更高一层的抽象。有趣的事情是,像软件工程这类事物的趋势,剩下的10%问题不能用更高级的抽象解决会受到到多数人的批评和反对。这就像,java语言已经成为最流行的工具语言,而一些人实际上还在用汇编语言。
当你能够用hive或pig写MapReduce语言的时候,一些主要的高级抽象带来的好处有,可读性,可维护性,开发周期和自动优化。很少出现常见的性能问题是间接的严重的原因导致的。这些分析成批运行并已经运行了几分钟,那么多一两分钟又有什么关系?有些时候,hive或pig中的执行计划的优化比你写的代码要好的多。在处理小部分数据时,hive或pig会增加的额外的时间有点多,这时你应该写java MapReduce。
Pig和hive比其它东西更能影响MapReduce的设计模式。他们出现的新特性可以成为MapReduce中的设计模式。同样的,更多的设计模式也会因MapReduce而发展,很多流行的模式会成为更高层次优秀的抽象。
Pig和hive有他们自己的模式,随着专家们解决越来越多的问题,他们也会记录它们。Hive得益于是用了几十年的sql样式,但不是所有的sql在hive中都适用,反过来也一样。可能随着这些平台越来越流行,cookbook和design pattern类的书会写它们。