首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 开发语言 > 编程 >

线程札记之并发同步

2012-10-26 
线程笔记之并发同步在应用编程中,我们会遇到下面这样的调用模型。。。??当一个业务方法(begin)中顺序调用多个

线程笔记之并发同步

在应用编程中,我们会遇到下面这样的调用模型。。。

?


线程札记之并发同步
?当一个业务方法(begin)中顺序调用多个子业务方法(opertion1-N),且有些子业务方法比较耗时,那么自然而然完成这次调用所需要的时间就比较长了。对于这样的问题,通常情况下会从两个方面对其进行重构和调优:

?

    单个方法调优,即针对operation1-N中比较耗时的方法进行重构已达到期望的效果业务重组和方法重构,即对整个大的业务方法进行重组,找出合乎当前需求(功能和性能)的实现方式

1比较好理解,最常见的就是sql语句,IO和string buffer方面的调优;2则需要看具体的应用场景了。由于本文主要是侧重线程的并发与同步,所以我将例举一个比较特殊的场景(webservices 远程调用)。如下:

?


线程札记之并发同步

?

?

对照上述序列图,若每一个operation方法都将进行一次远程webservice调用,那么一次调用的代价就要包含网络通信方面的开销。如果网络延时很高,多次远程调用的代价就相当大了。那么该如果结合上面的第2点进行重构和调优呢?

?

减少网络调用次数。 例如 operation1->API1,operation2->API2, 那么是否可以提供一个包含API1和API2的新API12供其调用呢?这样一次调用就可以达到目的。多线程调用。如,让operation1与operation2并发,这样调用所需要的时间则为max(cost(operation1), cost(operation2))。这样做会大大提高系统的复杂性,谨慎而为之。

?

接下来进入主题线程札记之并发同步,谈谈并发的具体实现方式。

?

基本测试类ConcurrentSimpleTest

?

public class ConcurrentSimpleTest {public void method1() {System.out.println("before exec method1");try {Thread.sleep(400);} catch (InterruptedException e) {// TODO Auto-generated catch blockSystem.out.println("method1 has been interrupted.");e.printStackTrace();}System.out.println("after exec method1");}public void method2() {System.out.println("before exec method2");try {Thread.sleep(800);} catch (InterruptedException e) {// TODO Auto-generated catch blockSystem.out.println("method2 has been interrupted.");e.printStackTrace();}System.out.println("after exec method2");}}

?

?

方式1:使用线程的join方法

public static void main(String[] args) throws InterruptedException {final ConcurrentSimpleTest cst = new ConcurrentSimpleTest();long s1 = System.currentTimeMillis();cst.method1(); cst.method2();long s2 = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {cst.method1(); }});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {cst.method2();}});t1.start();t2.start();t1.join(); //t2.join(500); 实际400ms后,方法就返回了t2.join(); //t1.join(x); if x< max((800 - 400), (800-500)), 那该方法会在t2执行完前返回//线程1/2都已返回,需要验证结果long s3 = System.currentTimeMillis();System.out.println("time cost for normal execution:" + (s2-s1));System.out.println("time cost for concurrent execution:" + (s3-s2));}

?

?

方式2:使用信号量对象

?

自定义信号量
public class SimpleMonitor {public volatile int single = 0;public void require(int single) {this.single = single;}public synchronized void release(int single) {this.single -= single;this.notify();}}
?

?

public static void main(String[] args) throws InterruptedException {final ConcurrentSimpleTest cst = new ConcurrentSimpleTest();final SimpleMonitor monitor = new SimpleMonitor();long s1 = System.currentTimeMillis();cst.method1();cst.method2();long s2 = System.currentTimeMillis();monitor.require(2); //初始化信号量Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {try {cst.method1();} finally {monitor.release(1); //信号量-1}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {try {cst.method2();} finally {monitor.release(1); //信号量-1}}});t1.start();t2.start();synchronized (monitor) {while (monitor.single > 0) {monitor.wait(); // monitor.wait(10 * 1000), 进行超时异常处理,中断t1,t2}}                //线程1/2都已返回,需要验证结果long s3 = System.currentTimeMillis();System.out.println("time cost for normal execution:" + (s2 - s1));System.out.println("time cost for concurrent execution:" + (s3 - s2));}

?

?

使用JDK concurrent包中的信号量对象Semaphores
public static void main(String[] args) throws InterruptedException {final ConcurrentSimpleTest cst = new ConcurrentSimpleTest();final Semaphore monitor = new Semaphore(0);long s1 = System.currentTimeMillis();cst.method1();cst.method2();long s2 = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {try {cst.method1();} finally {monitor.release(); //增加信号量}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {try {cst.method2();} finally {monitor.release(); //增加信号量}}});t1.start();t2.start();monitor.acquireUninterruptibly(2); //tryAcquire(int permits, long timeout, TimeUnit unit) 可设置超时处理                //线程1/2都已返回,需要验证结果long s3 = System.currentTimeMillis();System.out.println("time cost for normal execution:" + (s2 - s1));System.out.println("time cost for concurrent execution:" + (s3 - s2));}

?

?

使用JDK concurrent包中的信号量对象CountDownLatch(似乎比Semaphores更适合这种场景)
public static void main(String[] args) throws InterruptedException {final ConcurrentSimpleTest22 cst = new ConcurrentSimpleTest22();final CountDownLatch monitor = new CountDownLatch(2); //设置计数器long s1 = System.currentTimeMillis();cst.method1();cst.method2();long s2 = System.currentTimeMillis();Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {try {cst.method1();} finally {monitor.countDown(); //计数器-1}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {try {cst.method2();} finally {monitor.countDown(); //计数器-1}}});t1.start();t2.start();monitor.await(1, TimeUnit.SECONDS); //monitor.await(1000, TimeUnit.MILLISECONDS); 设定超时阀值//线程1/2都已返回,需要验证结果long s3 = System.currentTimeMillis();System.out.println("time cost for normal execution:" + (s2 - s1));System.out.println("time cost for concurrent execution:" + (s3 - s2));}
?

?

方式3:使用JDK5中新的线程实现方式和线程池

?

?

public static void main(String[] args) throws InterruptedException, ExecutionException {final ExecutorService execPool = Executors.newFixedThreadPool(5);final ConcurrentSimpleTest3 cst = new ConcurrentSimpleTest3();long s1 = System.currentTimeMillis();cst.method1();cst.method2();long s2 = System.currentTimeMillis();Callable<Void> call1 = new Callable<Void>(){@Overridepublic Void call() throws Exception {cst.method1();return null;}};Callable<Void> call2 = new Callable<Void>(){@Overridepublic Void call() throws Exception {cst.method2();return null;}};Future<Void> task1 = execPool.submit(call1);Future<Void> task2 = execPool.submit(call2);task1.get();//task1.get(1, TimeUnit.SECONDS); get方法会阻塞,直到线程执行结束返回结果或者超时task2.get();//task2.get(1, TimeUnit.SECONDS);                //线程1/2都已返回,需要验证结果long s3 = System.currentTimeMillis();System.out.println("time cost for normal execution:" + (s2 - s1));System.out.println("time cost for concurrent execution:" + (s3 - s2));execPool.shutdown();}
?

要达到目的,我们的实现的方式有多种,上面给出的例子也只是抛砖引入,那么该如何抉择呢?个人认为应该考虑以下几个方面:

?

    并发线程(任务)的可控性。如线程执行是否超时,可中断执行中的线程,异常处理以及获取线程执行的状态等。方法1/2/3都针对具体的子线程(任务)可控,而方法2在超时设定方面则是针对并发的线程。合理性,即合乎人的思维和设计,个人认为用信号量同步的方式(方法2)比较合理。高效性,即性能。线程池应该是个不错的选择。简化性,即实现和使用比较简单化。

简化,其实就是更多的复用/重用。对于上面给出的应用场景,可以提出下面这样的模型:


线程札记之并发同步

其中主线程为main thread,在它的执行过程中会启动2个子线程T1和T2,等待T1和T2执行结束后,main thread才能执行结束。

?

进一步抽象下,便可以得到下面这个复杂点的模型:


线程札记之并发同步

模型中每个背景为白色的节点代表一个Thread,边代表执行的步骤和方向。

?

Begin 会启动T1和T2 线程,T2 执行完毕后会执行T4,而从 T1 和 T2 指向 T3 的两条边表示的是 T3 必须等 T1 和 T2 都执行完毕以后才能开始执行。若T1和T2同时执行完毕,那么T3和T4也会并发执行。当T3和T4执行完成,就会执行End,整个过程结束。

?

对于这个模型,我们可以提供一个简单的框架示意图:



线程札记之并发同步

其具体实现主要分为3部分:

?

    Constructor: 线程(任务)构建器,主要构建子线程(任务)以及其关联关系(主要是前置依赖关系)Executor: 线程(任务)执行器,主要负责高效地执行所提交的线程(任务)Monitor: 负责监控一组线程(任务)的执行状态并进行简单地调度。如前置任务出现异常,那么后续任务该任何执行等等

当然还得提供一些额外的访问接口:如,线程(任务)的执行结果等。这样一个框架/组件的雏形就有了,而作为调用者,也只要关心输入和输出即可。

?

?

?

总结:本文主要从一个顺序调用的例子出发,因性能问题引申到了并发,列出了并发同步的几种简单实现示例,并最后提出了一个并发框架的雏形。

?

有兴趣的朋友可以一起讨论其具体实现和应用。

?

--《全文完》--

热点排行