【SEDA异步框架】【四】异步框架总体设计与实现
??????
?????? 在这个框架的设想中,一个stage一般需要有如下几个组件:
?????? 1、D-MQ:分布式消息中间件。用做事件队列,以进行消息的传递。
?????? 2、Local-Queue:本地队列。一般是blockingQueue,用以辅助实现stage内的动态线程池。采用Local-Queue的目的在于避免数据在mq中的堆积导致mq性能下降。
?????? 3、Thread Pool:动态线程池。进行事件的并发处理。
?????? 4、Worker:事件的具体处理器
?????? 5、Stage Controller:stage的性能控制器。用以对stage的队列、资源、调度策略进行控制。
?????? 引此为框架的设计理念,于是有了如下基于SEDA的异步框架的架构设计。
?
2、SEDA异步框架的使用场景 ?
? ? ?? 该异步框架可以用来处理如下几个场景的问题:
?????? 1、系统资源监控(CPU、内存、线程池、队列)
?????? 2、外围服务交互情况(API被调用、上游服务交互、请求方等)监控
?????? 3、系统报警(服务异常、接口压力过大等)
?????? 4、基于日志和事件的数据挖掘(规则挖掘等)
?????? 5、重要业务数据切片转储(里程碑消息、核心服务交互数据等)
?????? 6、异步触发的操作(表A写完后异步写表B等)
?????? 其使用场景大致可如下图所示:
?
3、SEDA异步框架系统总体架构 ?
? ? ? 异步框架在无任何扩展的时候,其主要组件如下:
?????? 1、bundle:消息中心的核心组件。由读、处理、写三部分功能组成。同时整合开关、定时器、动态线程池等元素来支持多样化的输入和需求。bundle可以从多种数据源获取数据,并进行数据的处理。
?????? 1)开关:用以决定该bundle是否被激活。如未被激活,则该bundle将停止读取数据,同时不会在其他服务上产生该bundle对应的数据(比如在mq上生成该bundle的队列、连接、交换机等。)
?????? 2)定时器:用以指定该bundle是否定时运行。如未指定,则实时运行。
?????? 3)动态线程池:用以支持bundle以同步/异步方式调用。如未指定,则同步运行。
?????? 2、bundle decider:用以对bundle的关键指标进行决策(是否激活、时效性、同步类型等)。并同时提供健康检查。
?????? 3、Work carrier:处理数据的最小单元。
?
5、异步框架的AMQP实现(AMQP Bundle) ?
?????? 异步框架扩展的AMQP实现,其架构图如下所示:
??????
?????? 其主要组件说明如下:
?????? 1、amqp bundle:消息中心的核心组件。由读、处理、写三部分功能组成。在整合开关、定时器、动态线程池之余,提供了配置化的订阅订阅管理以及关键行为的声明。
?????? 要声明一个bundle仅需声明对应的bean,示例如下:
???
??????
?????? 2)定时器:用以指定该bundle是否定时运行。默认实时运行。我们为bundle加上定时器,示例如下:
?
??????
?????? 3)动态线程池:用以支持bundle内部的实际数据流处理过程以同步/异步方式调用。默认同步运行。我们为bundle加上动态线程池,示例如下:
?
??????
?????? 4)订阅发布:用以声明收集和推送信息时所需的交换机和密钥。通过支持逗号分隔的多key组合来支持多对多的上下游bundle关系。每个key的配置语 法符合rabbitmq中topic类型的exchange使用规范即可。默认采用“deimos-common”交换机。以下给出几种声明的配置:
?????? 其一:最简易配置。配置要订阅和发布的消息key即可。交换机采用默认配置。
?
??????
?????? 其二:声明特殊的来源和目的地的交换机:
?
???????????
?????? 6)监听容器:按照默认配置实现,并发数可通过配置指定。bundle如需额外设定channel数量,则示例如下:
?
??????
?????? 7)关键行为。用以给发布的消息打上bundle的标签。以辅助其他bundle进行数据筛选和处理。默认以发布的key为关键行为。如需额外声明,则示例如下:
?
??????
?????? 2、bundle decider:用以对bundle的关键指标进行决策(是否激活、时效性、同步类型等)。并同时提供健康检查。默认采用fix strategy decider(定参策略决策器)。可进行配置来指定所需决策器类型,示例如下:
?
??????
?????? 3、work carrier:处理数据的最小单元。Bundle依据决策器指示的状态同步/异步、实时/定时调用work carrier进行处理。完全对开发者透明。用户者无需关心该组件。bundle将结合decider进行调度。同时work carrier处理后的数据推送过程也对开发者透明。开发者所需要做的就是实现bundle的doWork方法,并将处理之后的数据直接return即 可。doWork方法如下所示:
?
?
?????? 以上详细介绍了SEDA框架的AMQP实现中主要组件的作用、声明方式以及实现原理。总结一下,异步框架的AMQP实现中,bundle与bundle之间通过分布式 队列rabbitmq进行数据传递,bundle内部提供包含阻塞队列的动态线程池taskExecutor来进行数据处理,同时提供了定时器timer 来控制bundle的定时/实时调用。workcarrier作为消息处理的最小单元,其调用机制完全对用户透明。消息在bundle中的接收、处理和推 送由bundle decider组件进行管理。用户只需要简单实现doWork方法和声明bundle配置即可实现消息的处理和传递。?
6、一个简单的bundle安装示例 ?
????????????
?????? 3、启动服务。至此,你的bundle也就随着服务的启动就自动启动并开始工作了。
?????? 以上描述的一个bundle启动的过程,当所需要处理的业务被合理拆解成数个bundle(也就是所谓的stage)之后,就可以形成一个完整的基于工作流的系统实现。以下为基于对来源A进行事件收集的简易报警系统,在被拆解为三个bundle之后的数据流:
??????
?6、框架改进空间
?????? 目前框架仅仅提供了一个非常轻量的解决方案,并仅对AMQP进行了实现。后续可有如下几个改进升级的空间:
?????? 1)构成bundle strategy center。各个bundle的decider(决策器)在决策过程中,可以依赖strategy center进行bundle的相关决策。
?????? 2)支持bundle集群化、broker集群化,并引入相关策略(比如一致性哈希)来保证基于该框架的系统的高可用。可纳入bundle strategy center。
?????? 3)框架进一步基于IOC思想,进行面向接口编程的改造和升级。
?????? (暂时整理到这,具体的uml图以及代码后续提供。)