Spring 异步多线程动态任务处理的使用心得
??因为我们每一个任务到是动态创建的,在spring要实现动态的new一个bean对象,我们会用到FactoryBean接口达到我们的目的。我们需要异步和多线程需要使用Spring的TaskExecutor它其实就是javaSE5之后的java.util.concurrent.Executor接口的一共抽象。
新建两个任务类:
?
package org.dave.spring.async;import java.util.Date;public class TaskA implements Runnable{private String name;public TaskA(){name = this.getClass().getName();}@Overridepublic void run() {String threadName = Thread.currentThread().getName(); System.out.println("ThreadName:"+threadName+" beginning work on "+new Date()); System.out.println("ThreadName:"+threadName+" taskName:"+name); System.out.println("ThreadName:"+threadName+" completed work on "+new Date());} }
?
?
package org.dave.spring.async;import java.util.Date;public class TaskB implements Runnable{private String name;public TaskB(){name = this.getClass().getName();}@Overridepublic void run() {String threadName = Thread.currentThread().getName(); System.out.println("ThreadName:"+threadName+" beginning work on "+new Date()); System.out.println("ThreadName:"+threadName+" taskName:"+name); System.out.println("ThreadName:"+threadName+" completed work on "+new Date());} }
?
创建一个默认任务:
?
package org.dave.spring.async;import java.util.Date;public class DefaultTask implements Runnable{ private String name;public DefaultTask(){name = this.getClass().getName();}@Overridepublic void run() {String threadName = Thread.currentThread().getName(); System.out.println("ThreadName:"+threadName+" beginning work on "+new Date()); System.out.println("ThreadName:"+threadName+" taskName:"+name); System.out.println("ThreadName:"+threadName+" completed work on "+new Date());}}
?
?创建一个任务工厂:
package org.dave.spring.async.factory;import org.dave.spring.async.DefaultTask;import org.dave.spring.async.TaskA;import org.dave.spring.async.TaskB;import org.springframework.beans.factory.FactoryBean;import org.springframework.beans.factory.InitializingBean;import org.springframework.stereotype.Component;@Component("taskFactoryBean")@SuppressWarnings("rawtypes")public class TaskFactoryBean implements FactoryBean,InitializingBean{private String taskType;@Overridepublic Object getObject() throws Exception { if(taskType!=null){ if(taskType.equals("a")){ return new TaskA(); }else if(taskType.equals("b")) { return new TaskB(); } }return new DefaultTask();} public void setTaskType(String taskType) {this.taskType = taskType;}@Overridepublic Class getObjectType() {if(taskType!=null){ if(taskType.equals("a")){ return TaskA.class; }else if(taskType.equals("b")) { return TaskB.class; } }return DefaultTask.class;}@Overridepublic boolean isSingleton() {return false;}@Overridepublic void afterPropertiesSet() throws Exception {taskType=null;}}
?创建一个产生任务组件:
package org.dave.spring.async.service;import org.dave.spring.async.DefaultTask;import org.dave.spring.async.TaskA;import org.dave.spring.async.TaskB;import org.dave.spring.async.factory.TaskFactoryBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component("genTask")public class GenTask {@Autowiredprivate TaskFactoryBean taskFactoryBean; public Runnable gen(String taskType) throws Exception{taskFactoryBean.setTaskType(taskType);Object obj = taskFactoryBean.getObject();if(obj instanceof TaskA ){ return (TaskA)obj;}else if(obj instanceof TaskB ){ return (TaskB)obj;}else if(obj instanceof DefaultTask ){ return (DefaultTask)obj;}return null;}}
?
?创建一个简单测试进程:
package org.dave.spring.async.service;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.core.task.TaskExecutor;import org.springframework.stereotype.Service;@Servicepublic class SimpleProcessor {@Autowiredprivate TaskExecutor taskExecutor;@Autowiredprivate GenTask genTask;public void process() throws Exception{ taskExecutor.execute(genTask.gen(null));taskExecutor.execute(genTask.gen("a"));taskExecutor.execute(genTask.gen("b")); }}
?
?Spring 配置文件:
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xmlns:task="http://www.springframework.org/schema/task"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsdhttp://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"><context:component-scan base-package="org.dave.spring.async"/> <task:executor id="taskExecutor" keep-alive="50000" queue-capacity="50" pool-size="5-10"/> <task:scheduled-tasks><task:scheduled ref="simpleProcessor" method="process" cron="3/10 * * * * ?"/></task:scheduled-tasks></beans>
?<context:component-scan标签是组件注解扫描。task:executor 是任务执行器keep-alive是线程保持活动时间默认为秒,queue-capacity任务队列的容量,pool-size是线程池的大小,5-10表示有5个core线程(活动的线程)10最大的线程数的设置。
<task:scheduled-tasks>任务调度器标签,<task:scheduled 任务调度simpleProcessor组件的process方法,每1分钟调用一次。
?
?
?