首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 软件管理 > 软件架构设计 >

Quartz 实施多线程任务

2012-10-18 
Quartz 执行多线程任务现在项目(web)中有个需求,每天临晨对一个 WEB 目标进行页面爬取,爬取过程是一个多线

Quartz 执行多线程任务

现在项目(web)中有个需求,每天临晨对一个 WEB 目标进行页面爬取,爬取过程是一个多线程任务,这个任务由?Quartz(Spring2 整合)cronTrigger 来调度。 大概同时会派出5-10个爬虫线程,执行爬虫线程的线程池,也是由 Spring 配置的 SimpleThreadPoolTaskExecutor。?

现在的情况:众所周知,Quartz?缺省维持了一组自己的线程池,default pool size = 10。 实际上我全系统只有一个任务,每天运行一次,那么,每次调度触发时,池中始终只有一个线程会被用到。 这个线程开始执行任务后, 单独配置的爬虫线程池再接着开始执行爬取任务。?

?

?

?

?

--------------------

?

在互联网应用里,去相关的网站自动的获取一些信息,是很常见的一种应用?

比如,我这里现在要实现的一个功能,就是明天定时去ebay,amazon两个网站,获取产品信息,?
从而,为自动分析的商务引擎提供基础的运算数据。?

这里由于需要提取的信息很多,商家太多,所以,需要threadpool来进行多线程的实现,这里?
使用httpclient作为http的实现,用自己的组件jager来提供模板解析的功能通过模板来提取?
指定页面信息,由于这里ebay和amazon会分析ip,如果一个ip访问过多的话,作为舞弊行为,?
而拒绝你继续访问。?

这也就是我们用threadpool来进行多线程的原因,这这里在每个线程里,都会去一个指定的网站?
获取代理服务器的信息,然后根据提取以后的代理服务器的信息,通过程序的代理上ebay和amazon?
这样也就不会发现舞弊的行为了。当一个ip失效,会switch到另一个代理ip继续。?

threadpool的代码比较简单,在1.5里提供了一个ThreadPoolExecutor的类,这个类已经基本?
实现了线程池的功能。?

自己简单封装了一个ThreadPoolManager的类?
代码如下?

?

public class ThreadPoolManager { private static Log log = LogFactory.getLog(ThreadPoolManager.class); private ThreadPoolManager() {} public static ThreadPoolManager newInstance() { return new ThreadPoolManager(); } private ThreadPoolExecutor taskPool = null; // Thread pool used to hold running threads private BlockingQueue taskQueue = null; // Blocking queue used to hold waiting threads private int corePoolSize = 2; private int maxPoolSize = 2; private int queueSize = 4; private long waitTime = 60; public final static String THREADPOOL_POOLSIZE_KEY = "corePoolSize"; public final static String THREADPOOL_MAXSIZE_KEY = "maxPoolSize"; public final static String THREADPOOL_QUEUESIZE_KEY = "queueSize"; public final static String THREADPOOL_WAITTIME_KEY = "waitTime"; void init(Properties p) { corePoolSize = StringUtils.str2Int(p.getProperty(THREADPOOL_POOLSIZE_KEY), 2); maxPoolSize = StringUtils.str2Int(p.getProperty(THREADPOOL_MAXSIZE_KEY), 2); queueSize = StringUtils.str2Int(p.getProperty(THREADPOOL_QUEUESIZE_KEY), 2); waitTime = StringUtils.str2Int(p.getProperty(THREADPOOL_WAITTIME_KEY), 60); taskQueue = new ArrayBlockingQueue(queueSize); taskPool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, waitTime, TimeUnit.SECONDS, taskQueue, new ThreadPoolExecutor.CallerRunsPolicy()); log.info("Initialize thread pool succeed. ThreadPool: corePoolSize = " + corePoolSize + ", queueSize = " + queueSize + ", maxPoolSize = " + maxPoolSize); } public void run(Runnable command) { this.taskPool.execute(command); } }
?

?

调用的方式?

Sample 代码

public class HitJobStart { ..... public void start() { if(findProxies == null) return; ThreadPoolManager pm = ThreadPoolManager.newInstance(); Properties p = new Properties(); p.setProperty(pm.THREADPOOL_MAXSIZE_KEY, ""+concurrent); p.setProperty(pm.THREADPOOL_POOLSIZE_KEY, ""+concurrent); p.setProperty(pm.THREADPOOL_QUEUESIZE_KEY, ""+concurrent); pm.init(p); for(FindProxy fp:findProxies) { hit(fp, urls, timeoutMillionSecond, pm); } } private void hit(FindProxy fp, final String[] urls, final int timeout, ThreadPoolManager pm) { fp.load(); List<Proxy> ps = fp.getValidProxys(); for(final Proxy p : ps) { pm.run(new Runnable(){ public void run() { hit(p, urls, timeout); } }); } } .... }

?

?

这里调用的地方是?
pm.run(new Runnable(){?
public void run()?
{?
hit(p, urls, timeout);?
}?
});?

这样就把一个线程压到线程池里了。

?

?

热点排行