Java Future 并发设计模式
简单而言,Future模式可以在连续流程中满足数据驱动的并发需求,既获得了并发执行的性能提升,又不失连续流程的简洁优雅。
除了Future外,其它比较常见的并发设计模式还包括“回调驱动(多线程环境下)”、“消息/事件驱动(Actor模型中)”等。
回调是最常见的异步并发模式,它有即时性高、接口设计简单等有点。但相对于Future,其缺点也非常明显。首先,多线程环境下的回调一般是在触发回调的模块线程中执行的,这就意味着编写回调方法时通常必须考虑线程互斥问题;其次,回调方式接口的提供者在本模块的线程中执行用户应用的回调也是相对不安全的,因为你无法确定它会花费多长时间或出现什么异常,从而可能间接导致本模块的即时性和可靠性受影响;再者,使用回调接口不利于顺序流程的开发,因为回调方法的执行是孤立的,要与正常流程汇合是比较困难的。因此回调接口适合于在回调中只需要完成简单任务,并且不必与其它流程汇合的场景。
上述这些回调模式的缺点恰恰正是Future的长项。由于Future的使用是将异步的数据驱动天然的融入顺序流程中,因此你完全不必考虑线程互斥问题,Future甚至可以在单线程的程序模型(例如协程)中实现(参见下文将要提到的“Lazy Future”)。另一方面,提供Future接口的模块完全不必担心像回调接口那样的可靠性问题和可能对本模块的即时性影响。
另一类常见的并发设计模式是“消息(事件)驱动”,它一般运用在Actor模型中:服务请求者向服务提供者发送消息,然后继续进行后续不依赖服务处理结果的任务,在需要依赖结果前终止当前流程并记录状态;在等到回应消息后根据记录的状态触发后续流程。这种基于状态机的并发控制虽然比回调更适合于有延续性的顺序流程,但开发者却不得不将连续流程按照异步服务的调用切断为多个按状态区分的子流程,这样又人为的增加了开发的复杂性。运用Future模式可以避免这个问题,不必为了异步调用而打碎连续的流程。但是有一点应当特别注意:Future.get()方法可能会阻塞线程的执行,所以它通常无法直接融入常规的Actor模型中。(基于协程的Actor模型可以较好的解决这个冲突)
Future的灵活性还体现在其同步和异步的自由取舍,开发者可以根据流程的需要自由决定是否需要等待[Future.isDone()],何时等待[Future.get()],等待多久[Future.get(timeout)]。比如可以根据数据是否就绪而决定要不要借这个空档完成点其它任务,这对于实现“异步分支预测”机制是相当方便的。
除了上面提到的基础形态之外,Future还有丰富的衍生变化,这里就列举几个常见的。
与一般的Future不同,Lazy Future在创建之初不会主动开始准备引用的对象,而是等到请求对象时才开始相应的工作。因此,Lazy Future本身并不是为了实现并发,而是以节约不必要的运算资源为出发点,效果上与Lambda/Closure类似。例如设计某些API时,你可能需要返回一组信息,而其中某些信息的计算可能会耗费可观的资源。但调用者不一定都关心所有的这些信息,因此将那些需要耗费较多资源的对象以Lazy Future的形式提供,可以在调用者不需要用到特定的信息时节省资源。
另外Lazy Future也可以用于避免过早的获取或锁定资源而产生的不必要的互斥。
Promise可以看作是Future的一个特殊分支,常见的Future一般是由服务调用者直接触发异步处理流程,比如调用服务时立即触发处理或 Lazy Future的取值时触发处理。但Promise则用于显式表示那些异步流程并不直接由服务调用者触发的情景。例如Future接口的定时控制,其异步流程不是由调用者,而是由系统时钟触发,再比如淘宝的分布式订阅框架提供的Future式订阅接口,其等待数据的可用性不是由订阅者决定,而在于发布者何时发布或更新数据。因此,相对于标准的Future,Promise接口一般会多出一个set()或fulfill()接口。
常规的Future是一次性的,也就是说当你获得了异步的处理结果后,Future对象本身就失去意义了。但经过特殊设计的Future也可以实现复用,这对于可多次变更的数据显得非常有用。例如前面提到的淘宝分布式订阅框架所提供的Future式接口,它允许多次调用waitNext()方法(相当于Future.get()),每次调用时是否阻塞取决于在上次调用后是否又有数据发布,如果尚无更新,则阻塞直到下一次的数据发布。这样设计的好处是,接口的使用者可以在其任何合适的时机,或者直接简单的在独立的线程中通过一个无限循环响应订阅数据的变化,同时还可兼顾其它定时任务,甚至同时等待多个Future。简化的例子如下:
for (;;) { schedule = getNextScheduledTaskTime(); while(schedule > now()) { try { data = subscription.waitNext(schedule - now()); processData(data); } catch(Exception e) {...} } doScheduledTask();}