对chainsaw中一个简单Job Scheduler的扩展
??? 今天在看apache chainsaw这个项目的源代码时,无意中发现了一个非常简单的Job Scheduler的实现,源代码可以看这里:http://svn.apache.org/repos/asf/logging/chainsaw/trunk/src/main/java/org/apache/log4j/scheduler/,其中一个是Scheduler,另一个是Job接口。
?
??? Scheduler介绍道:
package cn.lettoo.scheduler;import java.text.SimpleDateFormat;import java.util.Date;public class SimpleJob implements Job { private String name; public SimpleJob(String name) { this.name = name; } public void execute() { Date now = new Date(System.currentTimeMillis()); System.out.println(String.format("%s: %s executed by thread %s", SimpleDateFormat.getDateTimeInstance().format(now), this.name, Thread.currentThread().getName())); }}?
??? 再写一个测试类:
package cn.lettoo.scheduler;public class JobTest { public static void main(String[] args) { Scheduler scheduler = new Scheduler(); Job job1 = new SimpleJob("job1"); scheduler.schedule(job1, System.currentTimeMillis(), 5000); scheduler.start(); }}
?
??? *这里的scheduler.schedule(job1, System.currentTimeMillis(), 5000);表示立即运行,且每5秒运行一次。
??? 执行结果如下:
public void execute() { Date now = new Date(System.currentTimeMillis()); System.out.println(String.format("%s: %s executed by thread %s", SimpleDateFormat.getDateTimeInstance().format(now), this.name, Thread.currentThread().getName())); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }?
??? 同时,在JobTest中创建多个job,并且让Scheduler去执行:
public static void main(String[] args) { Scheduler scheduler = new Scheduler(); Job job1 = new SimpleJob("job1"); scheduler.schedule(job1, System.currentTimeMillis(), 5000); Job job2 = new SimpleJob("job2"); scheduler.schedule(job2, System.currentTimeMillis() + 1000, 5000); Job job3 = new SimpleJob("job3"); scheduler.schedule(job3, System.currentTimeMillis() + 2000, 5000); Job job4 = new SimpleJob("job4"); scheduler.schedule(job4, System.currentTimeMillis() + 3000, 5000); Job job5 = new SimpleJob("job5"); scheduler.schedule(job5, System.currentTimeMillis() + 4000, 5000); scheduler.start(); }
???? 再运行:
/** * Run scheduler. */ public synchronized void run() { while (!shutdown) { if (jobList.isEmpty()) { linger(); } else { ScheduledJobEntry sje = (ScheduledJobEntry) jobList.get(0); long now = System.currentTimeMillis(); if (now >= sje.desiredExecutionTime) { executeInABox(sje.job); jobList.remove(0); if (sje.period > 0) { sje.desiredExecutionTime = now + sje.period; schedule(sje); } } else { linger(sje.desiredExecutionTime - now); } } } // clear out the job list to facilitate garbage collection jobList.clear(); jobList = null; System.out.println("Leaving scheduler run method"); } /** * We do not want a single failure to affect the whole scheduler. * @param job job to execute. */ void executeInABox(final Job job) { try { job.execute(); } catch (Exception e) { System.err.println("The execution of the job threw an exception"); e.printStackTrace(System.err); } }???? 可以看到,只要在executeInABox的方法里,使用线程池的线程来执行job,就可以了。现在加一个Scheduler的子类,我加上一个ExecutorService来实现线程池。同时我重写了executeInABox的方法,使用一个Runnable的实现类JobThread来运行job的execute方法。
package cn.lettoo.scheduler;import java.util.concurrent.ExecutorService;public class ThreadPoolScheduler extends Scheduler { private ExecutorService pool; public ThreadPoolScheduler(ExecutorService pool) { super(); this.pool = pool; } @Override void executeInABox(final Job job) { pool.execute(new JobThread(job)); } class JobThread implements Runnable { private Job job; public JobThread(Job job) { this.job = job; } public void run() { try { this.job.execute(); } catch (Exception e) { System.err.println("The execution of the job threw an exception"); e.printStackTrace(System.err); } } }}
?
??? 再修改JobTest:
// 创建一个可缓存的线程池 ExecutorService pool = Executors.newCachedThreadPool(); // 构造带线程池的Scheduler ThreadPoolScheduler scheduler = new ThreadPoolScheduler(pool); ....... scheduler.start();
?
??? 再运行,结果如下:
private Set<Job> runningJobList = new HashSet<Job>(); @Override void executeInABox(final Job job) { if (!runningJobList.contains(job)) { runningJobList.add(job); pool.execute(new JobThread(job)); } } class JobThread implements Runnable { private Job job; public JobThread(Job job) { this.job = job; } public void run() { try { this.job.execute(); synchronized (this) { runningJobList.remove(job); } } catch (Exception e) { System.err .println("The execution of the job threw an exception"); e.printStackTrace(System.err); } } }?
??? 再执行:
2011-10-14 23:29:27: job1 executed by thread pool-1-thread-1???? 可以看到,这里已经避免了job在执行的时候,再次被执行。当然,也发生了其他的问题,如job1,第一次执行在23:29:27,执行过程是10秒,那应该在23:29:37执行完,而我们要求是每5秒执行一次的话,则应该立即执行才对,可是实际上是在23:29:42才执行的。为什么会这样呢?原来,在Scheduler中的run()方法中,只要执行了executeInABox方法之后,都会在jobList.remove(0),也就是在job1被scheduler并且到了时间之后,即使没有被执行,但是也被从jobList里remove掉了,然后再重新加5秒再次scheduler上,也就是在23:29:37秒job1真正执行完成时,才再次重新scheduler上,也就是在42秒执行了。这是一个问题,如果要实现这个问题,需要重新对Scheduler的代码进行重构,即在run()方法加上对runningJobList的检查功能。我这里就没有实现,如果您有更好的方法,欢迎指出。
?