数据仓库ETL调度工具的开发(一)
目标
实现一款简单的ETL调度工具,支持几个关键功能:
1、作业依赖关系的配置
一个作业可能依赖于多个作业,同时也可以被多个作业依赖,前面所依赖的作业都执行成功后才能执行后面的作业
2、作业优先级的控制
可能有很多相互之间没有依赖关系的作业,哪个优先执行,需要控制
3、作业并发数的控制
控制同时运行作业的最大个数
4、作业相关的元数据
存储在数据库,比如作业的基本信息:id,作业名称,作业类型,参数值,优化级。作业所在服务器相关信息,作业与作业依赖关系表等等
实现步骤
提供可视化界面维护作业相关的元数据,调度工具从元数据库中读取作业相关信息截入内存,大体实现步骤:
1、作业初始化
将元数据库无任何依赖关系的作业读入作业队列jobqueue,jobqueue为ArrayList类
2、作业依赖关系的控制
通过Map类分别构建Map1存储一个作业id,依赖于哪些作业id,Map2存储一个作业id,被哪些作业id依赖, 一个作业执行成功后,通过Map2通知相关的作业做检查,
借助Map1检索到其子作业,看是否都成功
3、作业优先级的控制
每对jobqueue add一次,重新对jobqueue排序一次,借助Collections.sort,按优先级,提交时间排序
4、作业并发数的控制
作业的Thread类内,声明volatile型变量代表正运行作业的个数,作业运行前以及作运完后,加锁修改
源码
以下为控制远程shell脚本作业的Java实现源码,其他类型的作业可直接扩展相应的方法:
作业类
public class JobInfo {int priority;long starttime;int jobid;public JobInfo(int priority,long starttime,int jobid){this.priority=priority;this.starttime=starttime;this.jobid=jobid;}public int getPriority(){return priority;}public long getStarttime(){return starttime;}public String toString(){return String.valueOf(jobid);}}
主类
import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.Statement;import java.util.*;import java.util.concurrent.ConcurrentHashMap;/* * 调度程序 * 无前置关系的作业初始化的时候加入队列 * 构建两个哈希表,一个存储每个作业被哪些作业依赖,便于该作业执行完后通知那些作业去检查是否可开始加入队列了 * 另一个存储每个作业依赖哪些作业,用来查找对应的子作业的状态是否全都OK了 * 如何控制作业啥时候退出,用作业队列和为S作业个数,来判断,如果连续循环多少次后一直是这两个条件满足,x变量,都退出 * 只要中间有一次不连续,做了其他的,比如从队列取了作业,正在并发的>=5,都加x赋0 */public class EtlControl {//Queue <Integer> qe;ArrayList<JobInfo> jobqueue;Map<Integer,ArrayList> refmap;Map<Integer,ArrayList> refedmap;Map<Integer,Integer> stautsmap;Map<Integer,ArrayList<String>> sourcemap; Map<Integer,ArrayList<String>> jobinfomap;//job基本信息Map<Integer,ArrayList<String>> jobparmap;//job参数信息@SuppressWarnings("rawtypes")public EtlControl(ArrayList<JobInfo> jobqueue,Map<Integer,ArrayList> refmap,Map<Integer,ArrayList> refedmap,Map<Integer,Integer> stautsmap,Map<Integer,ArrayList<String>> sourcemap,Map<Integer,ArrayList<String>> jobinfomap,Map<Integer,ArrayList<String>> jobparmap){this.jobqueue=jobqueue;this.refedmap=refedmap;this.refmap=refmap;this.stautsmap=stautsmap;this.sourcemap=sourcemap;this.jobinfomap=jobinfomap;this.jobparmap=jobparmap;}public void init(){DBUtil dbutil=new DBUtil();Statement pstmt=null;Connection conn=null;ResultSet rs=null;ResultSet rs2=null;ResultSet rs3=null;ResultSet rsjobinfo=null;ResultSet rsjobpar=null;PreparedStatement ps=null;String sql="select jobid,priority from etlcontrol_job_info a where not exists (select 1 from etlcontrol b where a.jobid=b.thisjob)";//不依赖任何作业的作业String sql2="select jobid from etlcontrol_job_info "; //所有的作业String sqlref="select refjob from etlcontrol where thisjob=?";//这个作业依赖于哪些作业String sqlrefed="select thisjob from etlcontrol where refjob=?";//这个作业哪些些作业依赖String sqlsource="select source_id,ip,dir,username,password1,nvl(db_sid,'NA'),nvl(port,'NA'),nvl(logdir,'NA') from etlcontrol_job_source";String sqljobinfo="select jobid,jobname,kind,source_id,priority from etlcontrol_job_info"; //作业基本信息String sqljobpar="select param_value from etlcontrol_job_paramvalue where jobid=? order by seq";//作业参数String sqljobpar0="select distinct jobid from etlcontrol_job_paramvalue";//作业参数try{conn=dbutil.getConnection("oracle");pstmt=conn.createStatement(); rs=pstmt.executeQuery(sql); // 初始化作业队列,无依赖关系的 while (rs.next()) { //System.out.println(); //qe.offer(rs.getInt(1)); int jobid=rs.getInt(1); int priority=rs.getInt(1); long starttime=new Date().getTime(); jobqueue.add(new JobInfo(priority,starttime,jobid)); } rs=pstmt.executeQuery(sql2); //初始化一个job,循环job, 计算出依赖于哪些作业,以及被哪些作业依赖,分别存在相应的map while(rs.next()) { int thisjob=rs.getInt(1); ps=conn.prepareStatement(sqlref); ps.setInt(1, thisjob); rs2=ps.executeQuery(); ArrayList<Integer> reflist=new ArrayList<Integer>(); while (rs2.next()) { if (rs2.getInt(1)>0) //不依赖任何作业的作业不写入依赖的哈希表 reflist.add(rs2.getInt(1)); } if (reflist.size()>0) refmap.put(thisjob, reflist); ps=conn.prepareStatement(sqlrefed); ps.setInt(1, thisjob); rs2=ps.executeQuery(); ArrayList<Integer> refedlist=new ArrayList<Integer>(); while (rs2.next()) { refedlist.add(rs2.getInt(1)); } if (refedlist.size()>0) refedmap.put(thisjob, refedlist); }//双层循环结束 //将有参数的作业参数值写入内存 rs=pstmt.executeQuery(sqljobpar0); while(rs.next()) { int thisjob=rs.getInt(1); System.out.println("将有参数的作业参数值写入内存::"+thisjob); ps=conn.prepareStatement(sqljobpar); ps.setInt(1, thisjob); rsjobpar=ps.executeQuery(); ArrayList<String> rsjobparlist=new ArrayList<String>(); while (rsjobpar.next()) { rsjobparlist.add(rsjobpar.getString(1)); } jobparmap.put(thisjob, rsjobparlist); } //将source服务器信息写入到内存 rs3=pstmt.executeQuery(sqlsource); //source_id,ip,dir,username,password1,nvl(db_sid,'NA'),nvl(port,'NA') while (rs3.next()) { //存储数据源,id,ip,目录,用户名,密码等信息 ArrayList<String> sourcelist=new ArrayList<String>(); sourcelist.add(rs3.getString(2));//ip sourcelist.add(rs3.getString(3));//dir sourcelist.add(rs3.getString(4));//username sourcelist.add(rs3.getString(5));//password sourcelist.add(rs3.getString(6)); sourcelist.add(rs3.getString(7)); //port sourcelist.add(rs3.getString(8)); //logdir sourcemap.put(rs3.getInt(1), sourcelist); } //将每个作业的基本信息写入内存 rsjobinfo=pstmt.executeQuery(sqljobinfo); while (rsjobinfo.next()) { //存储job名字,类型等等 ArrayList<String> jobinfolist=new ArrayList<String>(); jobinfolist.add(rsjobinfo.getString(2));//jobname jobinfolist.add(rsjobinfo.getString(3));//kind jobinfolist.add(rsjobinfo.getString(4));//source jobinfolist.add(rsjobinfo.getString(5));//priority jobinfomap.put(rsjobinfo.getInt(1), jobinfolist); } conn.close(); rs.close(); rs2.close(); rs3.close(); rsjobinfo.close(); rsjobpar.close();}catch(Exception e){e.printStackTrace();}}public void start() throws InterruptedException{ int x=0;//控制作业什么时候退出 int c=0;while (true){synchronized (jobqueue) {if (jobqueue.isEmpty()){ //作业队列为空,正在运行的作业个数为0的时候 x++if (c==0){x++;}jobqueue.wait(6000);}}c=LaunchJob.scnt; //状态为S的作业个数 System.out.println("status is S:"+c); if (c<5 ) //最大并发数不超过5 {//Integer job=qe.poll();//从队列里取作业,如果为NULL,此处用int会报错 JobInfo jobinfo=null; if (!jobqueue.isEmpty()) { jobinfo=jobqueue.remove(0); } if (jobinfo!=null){int job=jobinfo.jobid;if (job>0){x=0;LaunchJob lj=new LaunchJob(job,refmap,refedmap,stautsmap, jobqueue,sourcemap,jobinfomap,jobparmap);lj.start();}} }//大于并发数5,则休息3s等作业运行完 else { x=0;Thread.sleep(3000); }if (x>20)//20次循环都未发生有在运行的作业,则退出主线程{System.out.println("exit");break;}/*if (c>1){System.out.println("exit");break;}*/}}public static void main(String[] args) throws InterruptedException{ //Queue <Integer> qe=new LinkedList<Integer>();ArrayList<JobInfo> jobqueue =new ArrayList<JobInfo>(); Map<Integer,ArrayList> refmap=new ConcurrentHashMap<Integer,ArrayList>(); Map<Integer,ArrayList> refedmap=new ConcurrentHashMap<Integer,ArrayList>(); Map<Integer,Integer> stautsmap=new ConcurrentHashMap<Integer,Integer>(); Map<Integer,ArrayList<String>> sourcemap=new ConcurrentHashMap<Integer,ArrayList<String>>(); Map<Integer,ArrayList<String>> jobinfomap=new ConcurrentHashMap<Integer,ArrayList<String>>(); Map<Integer,ArrayList<String>> jobparmap=new ConcurrentHashMap<Integer,ArrayList<String>>(); //System.out.println(qe.poll());EtlControl ec=new EtlControl(jobqueue,refmap,refedmap,stautsmap,sourcemap,jobinfomap,jobparmap);ec.init();System.out.println("refmap: "+refmap.toString());System.out.println("refedmap: " +refedmap.toString());System.out.println("jobqueue.toString(): "+jobqueue.toString());System.out.println("sourcemap: "+sourcemap.toString());System.out.println("jobinfomap: "+jobinfomap.toString());System.out.println("jobparmap: "+jobparmap.toString());ec.start();System.out.println(stautsmap.toString());}}
作业线程类
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader;import java.util.ArrayList;import java.util.Collections;import java.util.Comparator;import java.util.Date;import java.util.Iterator;import java.util.Map;import java.util.Queue;import ch.ethz.ssh2.ChannelCondition;import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.Session; import ch.ethz.ssh2.StreamGobbler;public class LaunchJob extends Thread{private static final long TIME_OUT = 100;public static volatile int scnt=0; //正在运行作业的个数int job;private int jobstatus=9; //代表运行中Map<Integer,ArrayList> refmap;Map<Integer,ArrayList> refedmap;Map<Integer,Integer> stautsmap;Map<Integer,ArrayList<String>> sourcemap;//服务器源信息Map<Integer,ArrayList<String>> jobinfomap; //作业基本信息//Queue <Integer> qe;ArrayList<JobInfo> jobqueue;Map<Integer,ArrayList<String>> jobparmap;//job参数信息public LaunchJob(int job,Map<Integer,ArrayList> refmap,Map<Integer,ArrayList> refedmap,Map<Integer,Integer> stautsmap,ArrayList<JobInfo> jobqueue,Map<Integer,ArrayList<String>> sourcemap,Map<Integer,ArrayList<String>> jobinfomap,Map<Integer,ArrayList<String>> jobparmap){this.job=job;this.refmap=refmap;this.refedmap=refedmap;this.stautsmap=stautsmap;this.jobqueue=jobqueue;this.sourcemap=sourcemap;this.jobinfomap=jobinfomap;this.jobparmap=jobparmap;}public void setJobstatus(int stauts){this.jobstatus=stauts;} public int getJobstatus() { return jobstatus; } public static synchronized void incrementScnt() { scnt++; } public static synchronized void reduceScnt() { scnt--; } public void run() { //通过传过来的jobid 找到jobname job类型,source_id再去sourcemap关联到ip,目录信息 ArrayList<String> jobinfolist=jobinfomap.get(job);// String jobname=jobinfomap.get(job).get(0); //String jobkind=jobinfomap.get(job).get(1); String jobname=jobinfolist.get(0); String jobkind=jobinfolist.get(1); int source_id=Integer.parseInt(jobinfolist.get(2)); //int priority=Integer.parseInt(jobinfolist.get(3)); if(jobkind.equals("shell")||jobkind.equals("mr")) execShellOrMr(jobname,source_id); } public void execProc() { } public void execMr() { } public void execShellOrMr(String jobname,int source_id) {// /正在运行状态stautsmap.put(job, getJobstatus());//String hostname = "10.207.0.22";//String username = "oracle";//String password = "ora123!@#";//通过source_id去查找服务器ip,用户,密码等String hostname = sourcemap.get(source_id).get(0);String username=sourcemap.get(source_id).get(2);String password=sourcemap.get(source_id).get(3);String dirlog=sourcemap.get(source_id).get(6)+"/"+jobname.replace(" ", "_")+"_error.log";String dir=sourcemap.get(source_id).get(1);StringBuilder sbcommand=new StringBuilder("sh ").append(dir).append("/").append(jobname);if (jobparmap.containsKey(job)){//通过job的id号去找参数 ArrayList<String> jobparlist=jobparmap.get(job); Iterator<String> it =jobparlist.iterator(); while (it.hasNext()) { sbcommand.append(" ").append(it.next()); }}System.out.println(sbcommand.toString());try{/* Create a connection instance */Connection conn = new Connection(hostname);/* Now connect */conn.connect();/* Authenticate */boolean isAuthenticated = conn.authenticateWithPassword(username, password);if (isAuthenticated == false)throw new IOException("Authentication failed.");/* Create a session */Session sess = conn.openSession();//sess.execCommand("uname -a && date && uptime && who");incrementScnt(); //状态为正在运行的数量加1//sess.execCommand("sh "+job);System.out.println(sbcommand.toString());sess.execCommand(sbcommand.toString()+" 2>"+dirlog);System.out.println("Here is some information about the remote host:");InputStream stdout = new StreamGobbler(sess.getStdout());BufferedReader br = new BufferedReader(new InputStreamReader(stdout));while (true){String line = br.readLine();if (line == null)break;System.out.println(line);}/* Show exit status, if available (otherwise "null") */sess.waitForCondition(ChannelCondition.EXIT_STATUS, TIME_OUT);reduceScnt(); //状态为正在运行的数量减1setJobstatus(sess.getExitStatus());System.out.println("ExitCode: " + getJobstatus());//运行状态写入哈希表stautsmap.put(job, new Integer(getJobstatus()));checkAndJoinQue();/* Close this session */sess.close();/* Close the connection */conn.close();}catch (IOException e){e.printStackTrace(System.err); System.exit(2);} } public void checkAndJoinQue() {//如果运行成功,检查其他待运行作业,加入队列if (getJobstatus()==0){//如果运行成功,检查依赖于该作业的一批作业是否应该加入待运行队列 if (refedmap.containsKey(job)) { ArrayList<Integer> refedjobs=refedmap.get(job);Iterator it =refedjobs.iterator(); while (it.hasNext()) { int job1=(Integer) it.next(); //要判断的job ArrayList<Integer> refjobs=refmap.get(job1); //所依赖的job int totalsize=refjobs.size(); int cnt=0; for (int i=0;i<totalsize;i++) { int job2=refjobs.get(i); if (stautsmap.containsKey(job2)) { if (stautsmap.get(job2).intValue()==0) cnt++; else break; //只要发现有一个失败的作业就没必要再继续检查了 } else break; //只要发现有一个作业还没写入开始,没有写入状态表就没必要再继续检查了 } if (cnt==totalsize)//全部运行成功// qe.offer(job1); { ArrayList<String> jobinfolist=jobinfomap.get(job1);// String jobname=jobinfomap.get(job).get(0); //String jobkind=jobinfomap.get(job).get(1);// String jobname=jobinfolist.get(0); //String jobkind=jobinfolist.get(1);// int source_id=Integer.parseInt(jobinfolist.get(2)); int priority=Integer.parseInt(jobinfolist.get(3)); synchronized(jobqueue) { jobqueue.add(new JobInfo(priority,new Date().getTime(),job1)); resortJobqueue();System.out.println("jobqueue.toString(): "+jobqueue.toString()); jobqueue.notifyAll(); } } }}} } public void resortJobqueue() { Comparator<JobInfo> comp = new Comparator<JobInfo>() { public int compare(JobInfo o1, JobInfo o2) { int res = o1.getPriority()-o2.getPriority(); if(res == 0) { if(o1.getStarttime() < o2.getStarttime()) res = 1; else res = (o1.getStarttime()==o2.getStarttime() ? 0 : -1); } return -res; } }; synchronized (jobqueue) { Collections.sort(jobqueue, comp); } }}