首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Hadoop Combiner调用时间点(没有写完)

2013-12-21 
Hadoop Combiner调用时间点(没写完)try {for (int i 0 i partitions ++i) {IFile.WriterK, V writ

Hadoop Combiner调用时间点(没写完)
try { for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly ... } else { int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); <- 这里调用combiner方法 } } .... } finally { .... } }

?

?

?

    mergeParts() // 将分区文件合并

MapTask.runNewMapper() ->

????? if (job.getNumReduceTasks() == 0) {
???????? output =
?????????? new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
????? } else {
??????? output = new NewOutputCollector(taskContext, job, umbilical, reporter);
????? }

??? ....

??? output.close(mapperContext); <-

?

NewOutputCollector.close() ->

??? collector.flush(); <-

?

MapOutputBuffer.flush() ->

??? ....

??? mergeParts();

????????? // minSpillsForCombine 在MapOutputBuffer构造函数内被初始化,

????????? // numSpills 为mapTask已经溢写到磁盘spill文件数量

????????? if (combinerRunner == null || numSpills < minSpillsForCombine) {
??????????? Merger.writeFile(kvIter, writer, reporter, job);
????????? } else {
??????????? combineCollector.setWriter(writer);
??????????? combinerRunner.combine(kvIter, combineCollector); <- 执行combine
????????? }

?

?

?

热点排行