[转发]Hadoop MapReduce程序编写备忘
转发:http://jbm3072.iteye.com/blog/1112741
最近在编写Hadoop 的MapReduce程序的时侯,学到了不少东西,记下来备忘:
1. Mapper和Reducer的上下文执行环境信息:
在编写Mapper类时,一般使用如下的类继承声明:
Java代码
public class DistinctProcessor extends MapReduceBase implements Mapper<Text,Text,Text,Text> {
MapReduceBase里面声明了如下方法:
Java代码
public class MapReduceBase
implements Closeable, JobConfigurable
{
public void close()
throws IOException
{
}
public void configure(JobConf job)
{
}
}
MapReduce框架在创建Mapper对象后,会调用configure方法,实现类可以从job获得到想要的信息,供map函数使用。不过MapReduce在JobConf也放置了关于MapTask的信息,供实现类使用:
Java代码
mapred.job.id String The job id
mapred.jar String job.jar location in job directory
job.local.dir String The job specific shared scratch space
mapred.tip.id String The task id
mapred.task.id String The task attempt id
mapred.task.is.map boolean Is this a map task
mapred.task.partition int The id of the task within the job
map.input.file String The filename that the map is reading from
map.input.start long The offset of the start of the map input split
map.input.length long The number of bytes in the map input split
mapred.work.output.dir String The task's temporary output directory
map.input.file表示读入的文件,这个比较有用,尤其是在根据不同文件名称进行不同处理的时候,可以据此来进行处理。
mapred.tip.id和mapred.job.id表示task和Job id。map.input.length表示此次map处理的文件长度。
总结: 借助Map和Reducer的上下文信息,我们可以在map和reduce函数中增加额外的判断和处理。
2. 使用MultipleOutputs使程序支持多输出
有时候,我们的输出可能不是一类,而是多类,例如统计每月和每周的交易次数。我们可以在collect(key,value)中的key加上不同的标识来表示不同的输出类型。但我们可能更想得到的是两个不同的文件,一个文件记录每月的交易次数,一个文件则记录每周的交易次数。使用MultiOuputs就可以做到。在Hadoop的文档中介绍了相关的例子,这里贴出来:
Java代码
JobConf conf = new JobConf();
conf.setInputPath(inDir);
FileOutputFormat.setOutputPath(conf, outDir);
conf.setMapperClass(MOMap.class);
conf.setReducerClass(MOReduce.class);
...
// Defines additional single text based output 'text' for the job
MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class,
LongWritable.class, Text.class);
// Defines additional multi sequencefile based output 'sequence' for the
// job
MultipleOutputs.addMultiNamedOutput(conf, "seq",
SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
...
JobClient jc = new JobClient();
RunningJob job = jc.submitJob(conf);
...
Job configuration usage pattern is:
public class MOReduce implements
Reducer<WritableComparable, Writable> {
private MultipleOutputs mos;
public void configure(JobConf conf) {
...
mos = new MultipleOutputs(conf);
}
public void reduce(WritableComparable key, Iterator<Writable> values,
OutputCollector output, Reporter reporter)
throws IOException {
...
mos.getCollector("text", reporter).collect(key, new Text("Hello"));
mos.getCollector("seq", "A", reporter).collect(key, new Text("Bye"));
mos.getCollector("seq", "B", reporter).collect(key, new Text("Chau"));
...
}
public void close() throws IOException {
mos.close(); //此处一定要加上!!!!!!!
...
}
}
需要特别注意的就是close方法里面必须要有mos.close(),否则输出无法写出到文件中。
3.使用MultipleInputs
MultipleOutputs支持多输出,那MultipleInputs自然是指支持多输入啦。但是我们在使用FileInputFormat的addInputPath时,就会发现FileInputFormat支持多输入。但这并不意味着MulitpleInputs就无用武之地了。我们看看MultipleInputs的方法,就可以看到MultipleInputs的作用了:
Java代码
public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass) ;
public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass)
根据提供的方法,就可以看到MultipleInputs的强大了吧。也就是说使用MultipleInputs你可以使用两个完全不同的输入类型,一个可能是TextInputFormat,另一个则可能是SeqFileInputFormat,也可能是DBInputFormat。而针对每一种输入类型,你还可以指定对应的Mapper类,强大的MultipleInputs可以对不同的输入类型进行融合,消化,然后传给reduce方法进行处理。
4. 多个Job的依赖关系JobControl
使用JobControl可以控制多个Job之间的依赖关系。实现workflow的效果。例如Job2需要在Job1干完之后才能开始干活,那该怎么办?总不能Job1启动执行结束后再启动Job2吧。
使用JobControl可以实现工作流。
在JobControl中可以设置Job与Job之间的依赖关系。JobControl根据依赖关系进行Job的调度。 JobControl实现了Runnable接口,意味着这个类可以在线程中启动运行。在运行时,我们可以调用JobControl的getRunningJobs获取到正在运行的Job。Job里面的JobClient可以得到Job运行的详细信息。