首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

WordCount 源码浅析(一)

2012-06-28 
WordCount 源码浅析(1)以WordCount为例子开始学习hadoop,试着去分析hadoop的工作机制。这篇文章的目的是分

WordCount 源码浅析(1)

以WordCount为例子开始学习hadoop,试着去分析hadoop的工作机制。这篇文章的目的是分析从创建JOB,到读取Configuration的过程

package com.fnk.hadoop;import java.io.IOException;import java.net.URL;import java.util.Iterator;import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;public class WordCount {public static class Map extends MapReduceBase implementsMapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line, ",");while (tokenizer.hasMoreTokens()) {word.set(tokenizer.nextToken());output.collect(word, one);}}}public static class Reduce extends MapReduceBase implementsReducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException {int sum = 0;while (values.hasNext()) {sum += values.next().get();}output.collect(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {                        //设置当前类到JobConf,并且初始化JobConfJobConf conf = new JobConf(WordCount.class);System.out.println(conf.get("fs.default.name"));conf.setJobName("wordcount");                        //设置输出Key的类别conf.setOutputKeyClass(Text.class);                        //设置输出value的类别conf.setOutputValueClass(IntWritable.class);                        //设置mapper的类conf.setMapperClass(Map.class);                        //设置combiner类,此类是用来减少Mapper和Reduce之间的数据传  //输量的,提高performace。要求输入和输出的Key 类型要一致。输入和输出的//value类型也要一致。conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputFormat(TextInputFormat.class);conf.setOutputFormat(TextOutputFormat.class);                        //设置输入文件夹和输出文件夹                         //输入输出文件夹的格式为hdfs://localhost:9000/input      //hdfs://localhost:9000/outputFileInputFormat.setInputPaths(conf, new Path(args[0]));FileOutputFormat.setOutputPath(conf, new Path(args[1]+ System.currentTimeMillis()));JobClient.runJob(conf);}    //设置mapper的类,调用Configuration类的setClass方法  public void setMapperClass(Class<? extends Mapper> theClass) {    setClass("mapred.mapper.class", theClass, Mapper.class);  }}public class JobConf extends Configuration { //初始化Jobconfpublic JobConf(Class exampleClass) {    设置jar类    setJarByClass(exampleClass);    checkAndWarnDeprecation();  }//如果cls这个类在jar包里面,那么把它设置到“mapred.jar”这个config里面public void setJarByClass(Class cls) {    String jar = findContainingJar(cls);    if (jar != null) {      setJar(jar);    }     }//查找my_class是不是包含在jar包里面,是的话返回jar包名,否则返回null  private static String findContainingJar(Class my_class) {    ClassLoader loader = my_class.getClassLoader();    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";    try {      for(Enumeration itr = loader.getResources(class_file);          itr.hasMoreElements();) {        URL url = (URL) itr.nextElement();        //判断是不是包含在jar包里面        if ("jar".equals(url.getProtocol())) {          String toReturn = url.getPath();          if (toReturn.startsWith("file:")) {            toReturn = toReturn.substring("file:".length());          }          toReturn = URLDecoder.decode(toReturn, "UTF-8");          return toReturn.replaceAll("!.*$", "");        }      }    } catch (IOException e) {      throw new RuntimeException(e);    }    return null;  }  //把jar包名设置到“mapred.jar”这个config里面  public void setJar(String jar) { set("mapred.jar", jar); }  //判断是不是设置了配置项mapred.task.maxvmem,设置了通知这个配置项已经被其它配置//项替代了,将调用Configuration类中的方法去取这个配置项,因为第一次调用,将会调用//loadResource方法,把所有的配置项加载到内存  private void checkAndWarnDeprecation() {    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)                + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY                + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);    }  }}public class Configuration implements Iterable<Map.Entry<String,String>>,Writable {//static{    //print deprecation warning if hadoop-site.xml is found in classpath    ClassLoader cL = Thread.currentThread().getContextClassLoader();    if (cL == null) {      cL = Configuration.class.getClassLoader();    }    if(cL.getResource("hadoop-site.xml")!=null) {      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "          + "mapred-site.xml and hdfs-site.xml to override properties of " +          "core-default.xml, mapred-default.xml and hdfs-default.xml " +          "respectively");    }    addDefaultResource("core-default.xml");    addDefaultResource("core-site.xml");  }//设置class  public void setClass(String name, Class<?> theClass, Class<?> xface) {    //判断xface是不是theClass的父类    if (!xface.isAssignableFrom(theClass))      throw new RuntimeException(theClass+" not "+xface.getName());    set(name, theClass.getName());  }//读取配置信息到propertiesprivate void loadResource(Properties properties, Object name, boolean quiet) {    try {      DocumentBuilderFactory docBuilderFactory         = DocumentBuilderFactory.newInstance();      //ignore all comments inside the xml file      docBuilderFactory.setIgnoringComments(true);      //allow includes in the xml file      docBuilderFactory.setNamespaceAware(true);      try {          docBuilderFactory.setXIncludeAware(true);      } catch (UnsupportedOperationException e) {        LOG.error("Failed to set setXIncludeAware(true) for parser "                + docBuilderFactory                + ":" + e,                e);      }      DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();      Document doc = null;      Element root = null;      if (name instanceof URL) {                  // an URL resource        URL url = (URL)name;        if (url != null) {          if (!quiet) {            LOG.info("parsing " + url);          }          doc = builder.parse(url.toString());        }      } else if (name instanceof String) {        // a CLASSPATH resource        URL url = getResource((String)name);        if (url != null) {          if (!quiet) {            LOG.info("parsing " + url);          }          doc = builder.parse(url.toString());        }      } else if (name instanceof Path) {          // a file resource        // Can't use FileSystem API or we get an infinite loop        // since FileSystem uses Configuration API.  Use java.io.File instead.        File file = new File(((Path)name).toUri().getPath())          .getAbsoluteFile();        if (file.exists()) {          if (!quiet) {            LOG.info("parsing " + file);          }          InputStream in = new BufferedInputStream(new FileInputStream(file));          try {            doc = builder.parse(in);          } finally {            in.close();          }        }      } else if (name instanceof InputStream) {        try {          doc = builder.parse((InputStream)name);        } finally {          ((InputStream)name).close();        }      } else if (name instanceof Element) {        root = (Element)name;      }      if (doc == null && root == null) {        if (quiet)          return;        throw new RuntimeException(name + " not found");      }      if (root == null) {        root = doc.getDocumentElement();      }      if (!"configuration".equals(root.getTagName()))        LOG.fatal("bad conf file: top-level element not <configuration>");      NodeList props = root.getChildNodes();      for (int i = 0; i < props.getLength(); i++) {        Node propNode = props.item(i);        if (!(propNode instanceof Element))          continue;        Element prop = (Element)propNode;        if ("configuration".equals(prop.getTagName())) {          loadResource(properties, prop, quiet);          continue;        }        if (!"property".equals(prop.getTagName()))          LOG.warn("bad conf file: element not <property>");        NodeList fields = prop.getChildNodes();        String attr = null;        String value = null;        boolean finalParameter = false;        for (int j = 0; j < fields.getLength(); j++) {          Node fieldNode = fields.item(j);          if (!(fieldNode instanceof Element))            continue;          Element field = (Element)fieldNode;          if ("name".equals(field.getTagName()) && field.hasChildNodes())            attr = ((Text)field.getFirstChild()).getData().trim();          if ("value".equals(field.getTagName()) && field.hasChildNodes())            value = ((Text)field.getFirstChild()).getData();          if ("final".equals(field.getTagName()) && field.hasChildNodes())            finalParameter = "true".equals(((Text)field.getFirstChild()).getData());        }                // Ignore this parameter if it has already been marked as 'final'        if (attr != null && value != null) {          if (!finalParameters.contains(attr)) {            properties.setProperty(attr, value);            if (finalParameter)              finalParameters.add(attr);          } else {            LOG.warn(name+":a attempt to override final parameter: "+attr                     +";  Ignoring.");          }        }      }            } catch (IOException e) {      LOG.fatal("error parsing conf file: " + e);      throw new RuntimeException(e);    } catch (DOMException e) {      LOG.fatal("error parsing conf file: " + e);      throw new RuntimeException(e);    } catch (SAXException e) {      LOG.fatal("error parsing conf file: " + e);      throw new RuntimeException(e);    } catch (ParserConfigurationException e) {      LOG.fatal("error parsing conf file: " + e);      throw new RuntimeException(e);    }  }}
?

?

热点排行