首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 网络技术 > 网络基础 >

对chainsaw中一个简略Job Scheduler的扩展

2012-10-21 
对chainsaw中一个简单Job Scheduler的扩展??? 今天在看apache chainsaw这个项目的源代码时,无意中发现了一

对chainsaw中一个简单Job Scheduler的扩展

??? 今天在看apache chainsaw这个项目的源代码时,无意中发现了一个非常简单的Job Scheduler的实现,源代码可以看这里:http://svn.apache.org/repos/asf/logging/chainsaw/trunk/src/main/java/org/apache/log4j/scheduler/,其中一个是Scheduler,另一个是Job接口。

?

??? Scheduler介绍道:

package cn.lettoo.scheduler;import java.text.SimpleDateFormat;import java.util.Date;public class SimpleJob implements Job { private String name; public SimpleJob(String name) { this.name = name; } public void execute() { Date now = new Date(System.currentTimeMillis()); System.out.println(String.format("%s: %s executed by thread %s", SimpleDateFormat.getDateTimeInstance().format(now), this.name, Thread.currentThread().getName())); }}

?

??? 再写一个测试类:

package cn.lettoo.scheduler;public class JobTest {    public static void main(String[] args) {        Scheduler scheduler = new Scheduler();                Job job1 = new SimpleJob("job1");        scheduler.schedule(job1, System.currentTimeMillis(), 5000);                scheduler.start();    }}

?

??? *这里的scheduler.schedule(job1, System.currentTimeMillis(), 5000);表示立即运行,且每5秒运行一次。

??? 执行结果如下:

public void execute() { Date now = new Date(System.currentTimeMillis()); System.out.println(String.format("%s: %s executed by thread %s", SimpleDateFormat.getDateTimeInstance().format(now), this.name, Thread.currentThread().getName())); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }

?

??? 同时,在JobTest中创建多个job,并且让Scheduler去执行:

public static void main(String[] args) {        Scheduler scheduler = new Scheduler();                Job job1 = new SimpleJob("job1");        scheduler.schedule(job1, System.currentTimeMillis(), 5000);                Job job2 = new SimpleJob("job2");        scheduler.schedule(job2, System.currentTimeMillis() + 1000, 5000);                Job job3 = new SimpleJob("job3");        scheduler.schedule(job3, System.currentTimeMillis() + 2000, 5000);                Job job4 = new SimpleJob("job4");        scheduler.schedule(job4, System.currentTimeMillis() + 3000, 5000);                Job job5 = new SimpleJob("job5");        scheduler.schedule(job5, System.currentTimeMillis() + 4000, 5000);                scheduler.start();    }

???? 再运行:

/** * Run scheduler. */ public synchronized void run() { while (!shutdown) { if (jobList.isEmpty()) { linger(); } else { ScheduledJobEntry sje = (ScheduledJobEntry) jobList.get(0); long now = System.currentTimeMillis(); if (now >= sje.desiredExecutionTime) { executeInABox(sje.job); jobList.remove(0); if (sje.period > 0) { sje.desiredExecutionTime = now + sje.period; schedule(sje); } } else { linger(sje.desiredExecutionTime - now); } } } // clear out the job list to facilitate garbage collection jobList.clear(); jobList = null; System.out.println("Leaving scheduler run method"); } /** * We do not want a single failure to affect the whole scheduler. * @param job job to execute. */ void executeInABox(final Job job) { try { job.execute(); } catch (Exception e) { System.err.println("The execution of the job threw an exception"); e.printStackTrace(System.err); } }

???? 可以看到,只要在executeInABox的方法里,使用线程池的线程来执行job,就可以了。现在加一个Scheduler的子类,我加上一个ExecutorService来实现线程池。同时我重写了executeInABox的方法,使用一个Runnable的实现类JobThread来运行job的execute方法。

package cn.lettoo.scheduler;import java.util.concurrent.ExecutorService;public class ThreadPoolScheduler extends Scheduler {       private ExecutorService  pool;    public ThreadPoolScheduler(ExecutorService  pool) {        super();        this.pool = pool;    }    @Override    void executeInABox(final Job job) {        pool.execute(new JobThread(job));    }        class JobThread implements Runnable {        private Job job;                public JobThread(Job job) {            this.job = job;        }                public void run() {            try {                this.job.execute();            } catch (Exception e) {                System.err.println("The execution of the job threw an exception");                e.printStackTrace(System.err);            }        }            }}

?

??? 再修改JobTest:

        // 创建一个可缓存的线程池        ExecutorService pool = Executors.newCachedThreadPool();        // 构造带线程池的Scheduler        ThreadPoolScheduler scheduler = new ThreadPoolScheduler(pool);        .......         scheduler.start();

?

??? 再运行,结果如下:

private Set<Job> runningJobList = new HashSet<Job>(); @Override void executeInABox(final Job job) { if (!runningJobList.contains(job)) { runningJobList.add(job); pool.execute(new JobThread(job)); } } class JobThread implements Runnable { private Job job; public JobThread(Job job) { this.job = job; } public void run() { try { this.job.execute(); synchronized (this) { runningJobList.remove(job); } } catch (Exception e) { System.err .println("The execution of the job threw an exception"); e.printStackTrace(System.err); } } }

?

??? 再执行:

2011-10-14 23:29:27: job1 executed by thread pool-1-thread-1
2011-10-14 23:29:28: job2 executed by thread pool-1-thread-2
2011-10-14 23:29:29: job3 executed by thread pool-1-thread-3
2011-10-14 23:29:30: job4 executed by thread pool-1-thread-4
2011-10-14 23:29:31: job5 executed by thread pool-1-thread-5
2011-10-14 23:29:38: job2 executed by thread pool-1-thread-2
2011-10-14 23:29:40: job4 executed by thread pool-1-thread-4
2011-10-14 23:29:41: job5 executed by thread pool-1-thread-5
2011-10-14 23:29:42: job1 executed by thread pool-1-thread-3
2011-10-14 23:29:44: job3 executed by thread pool-1-thread-1

???? 可以看到,这里已经避免了job在执行的时候,再次被执行。当然,也发生了其他的问题,如job1,第一次执行在23:29:27,执行过程是10秒,那应该在23:29:37执行完,而我们要求是每5秒执行一次的话,则应该立即执行才对,可是实际上是在23:29:42才执行的。为什么会这样呢?原来,在Scheduler中的run()方法中,只要执行了executeInABox方法之后,都会在jobList.remove(0),也就是在job1被scheduler并且到了时间之后,即使没有被执行,但是也被从jobList里remove掉了,然后再重新加5秒再次scheduler上,也就是在23:29:37秒job1真正执行完成时,才再次重新scheduler上,也就是在42秒执行了。这是一个问题,如果要实现这个问题,需要重新对Scheduler的代码进行重构,即在run()方法加上对runningJobList的检查功能。我这里就没有实现,如果您有更好的方法,欢迎指出。

?

热点排行