Hadoop Combiner调用时间点(没写完)
try { for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly ... } else { int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); <- 这里调用combiner方法 } } .... } finally { .... } }
?
?
?
MapTask.runNewMapper() ->
????? if (job.getNumReduceTasks() == 0) {
???????? output =
?????????? new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
????? } else {
??????? output = new NewOutputCollector(taskContext, job, umbilical, reporter);
????? }
??? ....
??? output.close(mapperContext); <-
?
NewOutputCollector.close() ->
??? collector.flush(); <-
?
MapOutputBuffer.flush() ->
??? ....
??? mergeParts();
????????? // minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,
????????? // numSpills 为mapTask已经溢写到磁盘spill文件数量
????????? if (combinerRunner == null || numSpills < minSpillsForCombine) {
??????????? Merger.writeFile(kvIter, writer, reporter, job);
????????? } else {
??????????? combineCollector.setWriter(writer);
??????????? combinerRunner.combine(kvIter, combineCollector); <- 执行combine
????????? }
?
?
?