首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > JAVA > Eclipse开发 >

eclipse中调试wordcount-local模式下job运作过程的分析

2014-01-14 
eclipse中调试wordcount--local模式下job运行过程的分析,限于个人水平,有错误的地方请大家予以回复纠正。?

eclipse中调试wordcount--local模式下job运行过程的分析
,限于个人水平,有错误的地方请大家予以回复纠正。

?

环境:主机WIN7+Eclipse4.3,hadoop源码已导入eclipse,虚拟机中运行Ubuntu13.04,伪分布式模式运行hadoop。在eclipse中调试wordcount,为了防止出现Failed to set permissions of path的异常,将org.apache.hadoop.fs.FileUtil类中checkReturnValue方法中的源码注释掉!注意:在主机eclipse中调试,实际上是使用单机模式跑job,与分布式环境的作业运行流程不完全相同,故本篇所讲内容仅具有参考意义。

?

? ? ? ? 入口是job.waitForCompletion方法,进入该方法后,会执行Job类中的submit方法,该方法主要有下面两行代码:

    connect();    info = jobClient.submitJobInternal(conf);

? ? ? ? 我们从这里分两条路,为了方便梳理,将代码按照执行顺序进行编号:

? ? ? ? 1.先看connect()方法,该方法中会new一个JobClient对象,在JobClient的构造函数中会去调用JobClient中的init方法,该方法的内容如下所示:

public void init(JobConf conf) throws IOException {    String tracker = conf.get("mapred.job.tracker", "local");    tasklogtimeout = conf.getInt(      TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);    this.ugi = UserGroupInformation.getCurrentUser();    if ("local".equals(tracker)) {      conf.setNumMapTasks(1);      this.jobSubmitClient = new LocalJobRunner(conf);    } else {      this.rpcJobSubmitClient =           createRPCProxy(JobTracker.getAddress(conf), conf);      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);    }          }

因为我这里没有对mapred.job.tracker没有做特别的设置,所以得到的tracker应该是local,这种情况下会去创建一个LocalJobRunner对象。(注意:可以在conf中将mapred.job.tracker设置为hadoop中jobtracker的URL,这种情况会执行else语句块的内容,是会在hadoop中运行该job的,但是也会遇到一些错误,这里先不做介绍啦。)LocalJobRunner对象实现了JobSubmissionProtocol接口,我们知道hadoop中主要使用RPC进行通讯,此接口就是client与JobTracker对象通讯的协议,后面代码中的jobClient对象其实就是通过其内置的jobSubmitClient与JobTracker之间进行交互的。

? ? ? ? 2.接下来我们再来看jobClient.submitJobInternal(conf)这段代码。该方法里面的代码相对来说比较复杂:

? ? ? ? 2.1.首先会调用JobSubmissionFiles.getStagingDir()方法,该方法内部主要有两块儿代码,先看第一个:

Path stagingArea = client.getStagingAreaDir();

该方法内部最终调用的实际上是LocalJobRunner类中的getStagingAreaDir方法,代码如下:

Path stagingRootDir =       new Path(conf.get("mapreduce.jobtracker.staging.root.dir",        "/tmp/hadoop/mapred/staging"));    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();    String user;    randid = rand.nextInt(Integer.MAX_VALUE);    if (ugi != null) {      user = ugi.getShortUserName() + randid;    } else {      user = "dummy" + randid;    }    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();

从这个方法里我们看以看到staging root dir的生成,该目录主要用来存放job的相关资源(比如我们写的代码),由于此处我所使用的是LocalJobRunner,所以此PATH是在我们本地文件系统中生成的(上面最后一行代码),并且我们可以看到,Staging的根目录路径为${mapreduce.jobtracker.staging.root.dir}/${username}随机数/.staging。

? ? ? ? 我们再来看JobSubmissionFiles.getStagingDir()方法中的另一块代码,这里有一个在windows中跑job的时候常遇到的错误。由于上面代码中的user中含有一个随机数,所以理论上绝大多数情况下我们都需要在staging root dir下面新建一个由用户名和随机数构成的目录,所以,我们会执行

fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION));

?最终我们实际上执行的是RowLocalFileSystem类中的mkdirs(Path f, FsPermission permission)方法,该方法中又会去调用setPermission(f, permission)方法,此方法进一步会调用org.apache.hadoop.fs.FileUtil类的setPermission方法,最终又调用了FileUtil类中的checkReturnValue方法,该方法会抛出一个异常:

//if (!rv) {//      throw new IOException("Failed to set permissions of path: " + p + //                            " to " + String.format("%04o", permission.toShort()));//}

我们前面没有提到rv,这个变量是代表staging目录的File试图去设置linux文件系统中的read权限的结果,但是在windows中这样的操作是失败的,于是rv接收到的值就是false,所以一旦进入checkReturnValue,就会抛出上面的异常。由于事先已将该段代码注释掉了,所以程序可以继续运行下去,可以顺利的退出JobSubmissionFiles.getStagingDir()方法。

? ? ? ? 2.2?JobID jobId = jobSubmitClient.getNewJobId(),这行代码会执行LocalJobRunner类中的getNewJobId方法,该方法中会创建一个JobID对象并返回。接下来执行

copyAndConfigureFiles(jobCopy, submitJobDir);
此方法会将job的配置信息和需要的资源文件(比如files、archives,libjars)拷贝到submitJobDir(staging root dir目录下面一个以JOBID命名的),如果是运行在hadoop环境中,还会把这些文件加入到DistributedCache中。 if (reduces == 0 ? jobCopy.getUseNewMapper() : jobCopy.getUseNewReducer()) { org.apache.hadoop.mapreduce.OutputFormat<?,?> output = ReflectionUtils.newInstance(context.getOutputFormatClass(), jobCopy); output.checkOutputSpecs(context); } else { jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy); } ? ? ? ? 2.4 如果输出路径不存在,就会接着往下运行创建inputSplits?
 FileSystem fs = submitJobDir.getFileSystem(jobCopy); LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir)); int maps = writeSplits(context, submitJobDir); jobCopy.setNumMapTasks(maps);
? ? ? ? 2.4.1 这段代码中主要来看一下writeSplits方法,该方法内部会去判断是否使用的是新的Mapper,如果是,就进一度调用JobClient中的writeNewSplits方法,否则就调用writeOldSplits。我使用的是新的API,所以就进入了writeNewSplits方法,该方法的代码如下:
    Configuration conf = job.getConfiguration();    InputFormat<?, ?> input =      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);    List<InputSplit> splits = input.getSplits(job);    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);    // sort the splits into order based on size, so that the biggest    // go first    Arrays.sort(array, new SplitComparator());    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,        jobSubmitDir.getFileSystem(conf), array);    return array.length;
首先,通过反射得到InputFormat对象,调用其getSplits方法得到输入分片的集合,然后对分片进行排序,SlitComparator是按照分片的大小降序排列的。 之后调用JobSplitWriter.createSplitFiles方法去创建输入分片(位于前面的submitJobDir目录下job.split和job.splitmetainfo以及对应的crc校验文件)。jobCopy.setNumMapTasks(maps);会把返回的分片的数目设为map task的数目。status = jobSubmitClient.submitJob( jobId, submitJobDir.toString(), jobCopy.getCredentials());由于我们是在本地运行的job,所以会调用LocalJobRunner类中的submitJob,如果是在hadoop集群中运行,则会调用JobTracker的submitJob方法。相对于JobTracker,LocalJobRunner类中的submitJob方法非常简单
    Job job = new Job(jobid, jobSubmitDir);    job.job.setCredentials(credentials);    return job.status;
? ? ? ? 2.7.1 进入Job的构造函数?,大部分代码都是比较简单的,先来看下面这几行代码
      this.trackerDistributedCacheManager =        new TrackerDistributedCacheManager(conf, taskController);      this.taskDistributedCacheManager =        trackerDistributedCacheManager.newTaskDistributedCacheManager(            jobid, conf);      taskDistributedCacheManager.setupCache(conf, "archive", "archive");
这里先创建了一个TrackerDistributedCacheManager对象,然后该对象又创建了一个TaskDistributedCacheManager对象,DistributedCache的具体工作流程可以参考这篇bog。之后,会更新(重新写入配置信息)job的xml配置文件。Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);这段代码会创建一个map task线程的集合,然后放入线程池中执行,map的输出结果文件放在mapOutputFiles中。 TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, false, 0), 0);从这行代码开始,下面的就是Reduce的执行部分。TaskAttemptID是什么东东呢,先来看TaskAttempt吧,TaskAttempt可以理解为一个task,map task或reduce task, 这里对应的是一个reduce的id。ReduceTask reduce = new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(), 1);reduce.setUser(UserGroupInformation.getCurrentUser().getShortUserName());JobConf localConf = new JobConf(job);localConf.set("mapreduce.jobtracker.address", "local");?然后会将map的输出文件交给reduce,
          for (int i = 0; i < mapIds.size(); i++) {              if (!this.isInterrupted()) {                TaskAttemptID mapId = mapIds.get(i);                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();                MapOutputFile localOutputFile = new MapOutputFile();                localOutputFile.setConf(localConf);                Path reduceIn =                  localOutputFile.getInputFileForWrite(mapId.getTaskID(),                        localFs.getFileStatus(mapOut).getLen());                if (!localFs.mkdirs(reduceIn.getParent())) {                  throw new IOException("Mkdirs failed to create "                      + reduceIn.getParent().toString());                }                if (!localFs.rename(mapOut, reduceIn))                  throw new IOException("Couldn't rename " + mapOut);              } else {                throw new InterruptedException();              }            }
之后会调用该reduce的run方法运行reducer
 reduce.run(localConf, this);
由于在local模式下,reduce的数量只可以是0或1,所以reduce的运行还是比较简单的、具体可以看run方法内部的代码。??

热点排行