WordCount 源码浅析(1)
以WordCount为例子开始学习hadoop,试着去分析hadoop的工作机制。这篇文章的目的是分析从创建JOB,到读取Configuration的过程
package com.fnk.hadoop;import java.io.IOException;import java.net.URL;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;public class WordCount {public static class Map extends MapReduceBase implementsMapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line, ",");while (tokenizer.hasMoreTokens()) {word.set(tokenizer.nextToken());output.collect(word, one);}}}public static class Reduce extends MapReduceBase implementsReducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {int sum = 0;while (values.hasNext()) {sum += values.next().get();}output.collect(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception { //设置当前类到JobConf,并且初始化JobConfJobConf conf = new JobConf(WordCount.class);System.out.println(conf.get("fs.default.name"));conf.setJobName("wordcount"); //设置输出Key的类别conf.setOutputKeyClass(Text.class); //设置输出value的类别conf.setOutputValueClass(IntWritable.class); //设置mapper的类conf.setMapperClass(Map.class); //设置combiner类,此类是用来减少Mapper和Reduce之间的数据传 //输量的,提高performace。要求输入和输出的Key 类型要一致。输入和输出的//value类型也要一致。conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class); //设置输入文件夹和输出文件夹 //输入输出文件夹的格式为hdfs://localhost:9000/input //hdfs://localhost:9000/outputFileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]+ System.currentTimeMillis()));JobClient.runJob(conf);} //设置mapper的类,调用Configuration类的setClass方法 public void setMapperClass(Class<? extends Mapper> theClass) { setClass("mapred.mapper.class", theClass, Mapper.class); }}public class JobConf extends Configuration { //初始化Jobconfpublic JobConf(Class exampleClass) { 设置jar类 setJarByClass(exampleClass); checkAndWarnDeprecation(); }//如果cls这个类在jar包里面,那么把它设置到“mapred.jar”这个config里面public void setJarByClass(Class cls) { String jar = findContainingJar(cls); if (jar != null) { setJar(jar); } }//查找my_class是不是包含在jar包里面,是的话返回jar包名,否则返回null private static String findContainingJar(Class my_class) { ClassLoader loader = my_class.getClassLoader(); String class_file = my_class.getName().replaceAll("\\.", "/") + ".class"; try { for(Enumeration itr = loader.getResources(class_file); itr.hasMoreElements();) { URL url = (URL) itr.nextElement(); //判断是不是包含在jar包里面 if ("jar".equals(url.getProtocol())) { String toReturn = url.getPath(); if (toReturn.startsWith("file:")) { toReturn = toReturn.substring("file:".length()); } toReturn = URLDecoder.decode(toReturn, "UTF-8"); return toReturn.replaceAll("!.*$", ""); } } } catch (IOException e) { throw new RuntimeException(e); } return null; } //把jar包名设置到“mapred.jar”这个config里面 public void setJar(String jar) { set("mapred.jar", jar); } //判断是不是设置了配置项mapred.task.maxvmem,设置了通知这个配置项已经被其它配置//项替代了,将调用Configuration类中的方法去取这个配置项,因为第一次调用,将会调用//loadResource方法,把所有的配置项加载到内存 private void checkAndWarnDeprecation() { if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) { LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY); } }}public class Configuration implements Iterable<Map.Entry<String,String>>,Writable {//static{ //print deprecation warning if hadoop-site.xml is found in classpath ClassLoader cL = Thread.currentThread().getContextClassLoader(); if (cL == null) { cL = Configuration.class.getClassLoader(); } if(cL.getResource("hadoop-site.xml")!=null) { LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " + "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, " + "mapred-site.xml and hdfs-site.xml to override properties of " + "core-default.xml, mapred-default.xml and hdfs-default.xml " + "respectively"); } addDefaultResource("core-default.xml"); addDefaultResource("core-site.xml"); }//设置class public void setClass(String name, Class<?> theClass, Class<?> xface) { //判断xface是不是theClass的父类 if (!xface.isAssignableFrom(theClass)) throw new RuntimeException(theClass+" not "+xface.getName()); set(name, theClass.getName()); }//读取配置信息到propertiesprivate void loadResource(Properties properties, Object name, boolean quiet) { try { DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); //ignore all comments inside the xml file docBuilderFactory.setIgnoringComments(true); //allow includes in the xml file docBuilderFactory.setNamespaceAware(true); try { docBuilderFactory.setXIncludeAware(true); } catch (UnsupportedOperationException e) { LOG.error("Failed to set setXIncludeAware(true) for parser " + docBuilderFactory + ":" + e, e); } DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); Document doc = null; Element root = null; if (name instanceof URL) { // an URL resource URL url = (URL)name; if (url != null) { if (!quiet) { LOG.info("parsing " + url); } doc = builder.parse(url.toString()); } } else if (name instanceof String) { // a CLASSPATH resource URL url = getResource((String)name); if (url != null) { if (!quiet) { LOG.info("parsing " + url); } doc = builder.parse(url.toString()); } } else if (name instanceof Path) { // a file resource // Can't use FileSystem API or we get an infinite loop // since FileSystem uses Configuration API. Use java.io.File instead. File file = new File(((Path)name).toUri().getPath()) .getAbsoluteFile(); if (file.exists()) { if (!quiet) { LOG.info("parsing " + file); } InputStream in = new BufferedInputStream(new FileInputStream(file)); try { doc = builder.parse(in); } finally { in.close(); } } } else if (name instanceof InputStream) { try { doc = builder.parse((InputStream)name); } finally { ((InputStream)name).close(); } } else if (name instanceof Element) { root = (Element)name; } if (doc == null && root == null) { if (quiet) return; throw new RuntimeException(name + " not found"); } if (root == null) { root = doc.getDocumentElement(); } if (!"configuration".equals(root.getTagName())) LOG.fatal("bad conf file: top-level element not <configuration>"); NodeList props = root.getChildNodes(); for (int i = 0; i < props.getLength(); i++) { Node propNode = props.item(i); if (!(propNode instanceof Element)) continue; Element prop = (Element)propNode; if ("configuration".equals(prop.getTagName())) { loadResource(properties, prop, quiet); continue; } if (!"property".equals(prop.getTagName())) LOG.warn("bad conf file: element not <property>"); NodeList fields = prop.getChildNodes(); String attr = null; String value = null; boolean finalParameter = false; for (int j = 0; j < fields.getLength(); j++) { Node fieldNode = fields.item(j); if (!(fieldNode instanceof Element)) continue; Element field = (Element)fieldNode; if ("name".equals(field.getTagName()) && field.hasChildNodes()) attr = ((Text)field.getFirstChild()).getData().trim(); if ("value".equals(field.getTagName()) && field.hasChildNodes()) value = ((Text)field.getFirstChild()).getData(); if ("final".equals(field.getTagName()) && field.hasChildNodes()) finalParameter = "true".equals(((Text)field.getFirstChild()).getData()); } // Ignore this parameter if it has already been marked as 'final' if (attr != null && value != null) { if (!finalParameters.contains(attr)) { properties.setProperty(attr, value); if (finalParameter) finalParameters.add(attr); } else { LOG.warn(name+":a attempt to override final parameter: "+attr +"; Ignoring."); } } } } catch (IOException e) { LOG.fatal("error parsing conf file: " + e); throw new RuntimeException(e); } catch (DOMException e) { LOG.fatal("error parsing conf file: " + e); throw new RuntimeException(e); } catch (SAXException e) { LOG.fatal("error parsing conf file: " + e); throw new RuntimeException(e); } catch (ParserConfigurationException e) { LOG.fatal("error parsing conf file: " + e); throw new RuntimeException(e); } }}?
?