mahout phase 应用之 RecommenderJob
上篇博客说了下mahout的phase参数的含义,但是没有涉及到具体的应用,上次也只是说了一下这两个参数,startPhase和endPhase的好处:当运行RecommenderJob时前面的两个phase都运行成功了,但是后面的运行出错,那么是 否要继续从第一个phase开始运行呢,其实完全没有必要,可以设置startPhase和endPhase这两个参数,直接跳过前面两个Phase。的确是,这个就是它的用处吧(或许还有其他用处?)下面就针对RecommenderJob来进行分析吧
看下图:
这个图针对RecommenderJob进行了分析,第二行的1,2,。。表示当前的Phase包含的MR数量;第三行是每个Phase运行后产生的文件及文件目录。下面说下我的跳过Phase的想法:由上面的图可以看出,每个Phase的输出目录文件是不一样的,所以我采用的做法是运行一遍RecommenderJob,如果有错,那么就去检测每个Phase的输出目录,如果里面有文件,那么就说明这个Phase是正常运行的,反之亦然;
下面上代码:
PhaseUtils.java:
package org.fansy.date1203.mahout.phase;import static org.fansy.date1203.mahout.phase.PhaseUtils.*;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.ToolRunner;import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;public class PhaseCheckDriver {/** * 原地运行任务 * @throws Exception */public static void main(String[] args) throws Exception {if(args.length!=2){System.out.print("Usage: <input><output>");System.exit(-1);}String arg="--input "+TEMP_PATH+args[0]+" --output "+TEMP_PATH+args[1]+" --booleanData false --similarityClassname SIMILARITY_COOCCURRENCE "+"--startPhase 0 --tempDir "+CF_TEMP_DIR;String[] tempArg=arg.split(" ");int temp=ToolRunner.run(new Configuration(), new RecommenderJob(),tempArg);if(temp==0){System.exit(0); // 程序正常退出}int times=3; // 继续运行任务三次,然后退出boolean flag_phase1=false;boolean flag_phase2=false;boolean flag_phase3=false;boolean flag_phase4=false;int startPhase=0;while(times-->=0){flag_phase1=PhaseUtils.checkFilesInPhaseOne();if(flag_phase1){flag_phase2=PhaseUtils.checkFilesInPhaseTwo();startPhase++;}if(flag_phase2){flag_phase3=PhaseUtils.checkFilesInPhaseThree();startPhase++;}if(flag_phase3){flag_phase4=PhaseUtils.fileExists(TEMP_PATH+args[1]);startPhase++;}if(flag_phase4){startPhase++;}if(startPhase>=4){break; // 说明4个phase运行成功,退出程序}// 否则的话,删除_SUCCESS文件,调整参数继续运行switch(startPhase){case 1:PhaseUtils.deleteFilesInPhaseOne();break;case 2:PhaseUtils.deleteFilesInPhaseOne();PhaseUtils.deleteFilesInPhaseTwo();break;case 3:PhaseUtils.deleteFilesInPhaseOne();PhaseUtils.deleteFilesInPhaseTwo();break;}// arg="--input "+TEMP_PATH+args[0]+" --output "+TEMP_PATH+args[1]+" --booleanData false --similarityClassname SIMILARITY_COOCCURRENCE "+"--startPhase "+startPhase+" --tempDir "+CF_TEMP_DIR;temp=ToolRunner.run(new Configuration(), new RecommenderJob(),tempArg);if(temp==0){System.exit(0); // 程序正常退出}}}}说明:上面的代码中的删除_SUCCESS文件是必须的,这个和我的测试方法有关,先说下我的测试方法吧。
我先运行了一遍RecommenderJob(操作A),然后屏蔽PhaseCheckDriver里面的第一个RecommenderJob,同时删除操作A的phase2产生的文件,然后运行PhaseCheckDriver,看是否有跳过并且是否出来最后的结果;
因为操作A 是成功的,所以在每个目录里会有_SUCCESS文件,当有这个文件的时候,直接使用startPhase=1跳过第phase0会显示出错,所以我在程序里面用代码删除了这个文件。
注: 上面的代码还未测试过;
分享,成长,快乐