Hadoop传递参数
写MapReduce程序的时候通常需要向Map中传递参数,比如在Map中过滤数据时往往需要一个过滤列表,传入的参数即为一个过滤数据的集合。
????Hadoop参数传递有一种比较简单的方法,即使用Configuration的set()和get()方法:
void Configuration.set(String key, String value)
String Configuration.get(String key)
????这种简单的方法的缺陷是,传递的值必须是String类型,具有一定的局限性。
?
?
Configuration的get与set方法对于传递的参数有内存的限制,若传递的参数内存过大,则任务会运行失败。若要传递一个很大的数据(比如一个词典文件),则可以使用一个分布式Cache的方法。首先需要向hdfs文件系统中上传(put)一个分布式文件,然后在Configure中读取文件,示例如下:
????字典文件dictory.list格式如下:
apple
bag
cat
…
????配置JobConf的代码如下:
JobConf conf = new JobConf(Hadoop.class);
DistributedCache.addCacheFile(new Path(“dictory.list”).toUri(), conf);
????在Mapper端读取文件如下:
public static class HadoopMap extends MapReduceBase
implementsMapper<LongWritable, Text, Text, LongWritable>, JobConfigurable{
?
????????private Set<String> dictSet = new HashSet<String>();
?
????????public void configure(JobConf conf) {
????????????????Path[] pathwaysFiles;
????????????????try {
???????????????????????pathwaysFiles = DistributedCache.getLocalCacheFiles(conf);
???????????????????????for (Path path: pathwaysFiles) {
??????????????????????????????BufferedReader fis = new BufferedReader(new FileReader(path.toString()));
??????????????????????????????String line = "";
??????????????????????????????while ((line = fis.readLine()) != null) {
??????????????????????????????????????dictSet.add(line);
??????????????????????????????}
??????????????????????}
?????????????????} catch (Exception e) {
?????????????????????????e.printStackTrace();
?????????????????}
????????}
?
????????public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException{
????????????????…
????????}
?
}