首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 其他教程 > 开源软件 >

应用Akka的Actor和Future简单地实现并发处理

2012-07-04 
使用Akka的Actor和Future简单地实现并发处理?应用场景:服务端要处理大量的客户端的请求,并且处理请求耗费

使用Akka的Actor和Future简单地实现并发处理

?

应用场景:服务端要处理大量的客户端的请求,并且处理请求耗费较长的时间。这时就需要使用并发处理。多线程是一种方法,这里使用Akka框架处理并发。(以下代码在Groovy1.7.5、akka-actors-1.2下运行成功)

?

这里有三个角色:Client、Master、Worker

Client傻乎乎地发同步请求给Master,一直等到结果返回客户端才离开。

Master接收客户端发来的请求,然后将请求交给Worker处理,处理完成之后将结果返回给Client。

Worker负责具体的业务处理,它耗费的事件比较长。

?

所以这里的关键在于Master,如果Master线性地“接收请求——调用Worker处理得到返回结果——将结果返回”,这样的系统必将歇菜。

使用Akka可以方便地将它变成并行地。

?

?

先看看Client,模拟同时多个客户端给Master发请求

import akka.actor.ActorRef

import static akka.actor.Actors.remote

?

/**

?* User: 谢炜

?* Date: 11-10-4

?* Time: 下午7:36

?*/

class HelloClient implements Runnable {

? ? int seq

? ? String serviceName

?

? ? HelloClient(int seq, String serviceName) {

? ? ? ? this.seq = seq

? ? ? ? this.serviceName = serviceName

? ? }

?

? ? void run() {

? ? ? ? ActorRef actor = remote().actorFor(serviceName, "10.68.15.113", 9999);

? ? ? ? String str = "Hello--" + seq

? ? ? ? println "请求-----${str}"

? ? ? ? Object res = actor.sendRequestReply(str)

? ? ? ? println "返回-----${res}"

}

?

? ? public static void main(String[] args) {

? ? ? ? for (int i = 0; i < 5; i++) {

? ? ? ? ? ? Thread thread = new Thread(new HelloClient(i, "hello-service"))

? ? ? ? ? ? thread.start()//同时启动5个客户端请求Master

? ? ? ? }

}

}

?

?

真正干活的Worker:

import akka.actor.UntypedActor

?

/**

?* User: 谢炜

?* Date: 11-10-5

?* Time: 上午9:27

?*/

class HelloWorker extends UntypedActor {//Worker是一个Actor,需要实现onReceive方法

? ? @Override

? ? void onReceive(Object o) {

? ? ? ? println "Worker 收到消息----" + o

? ? ? ? if (o instanceof String) {

? ? ? ? ? ? String result = doWork(o)//调用真实的处理方法

? ? ? ? ? ? getContext().replyUnsafe(result)//将结果返回给Master

? ? ? ? }

? ? }

//Worker处理其实很简单,仅仅将参数字符串改造一下而已。只不过使其sleep了20秒,让它变得“耗时较长”

? ? String doWork(String str) {

? ? ? ? Thread.sleep(1000 * 20)

? ? ? ? return "result----" + str + " 。"

? ? }

}

?

?

负责并发调度的Master:

import akka.actor.ActorRef

import akka.actor.Actors

import akka.actor.UntypedActor

import akka.actor.UntypedActorFactory

import akka.dispatch.Future

import akka.dispatch.Futures

import java.util.concurrent.Callable

?

/**

?* User: 谢炜

?* Date: 11-10-5

?* Time: 上午9:35

?*/

class HelloMaster extends UntypedActor {

? ? @Override

? ? void onReceive(Object o) {

? ? ? ? println "Master接收到Work消息:" + o

? ? ? ? def clientChannel = getContext().channel()//客户端链接Channel

? ? ? ? //启动worker actor

? ? ? ? ActorRef worker = Actors.actorOf(new UntypedActorFactory() {

? ? ? ? ? ? public UntypedActor create() {

? ? ? ? ? ? ? ? return new HelloWorker();

? ? ? ? ? ? }

? ? ? ? }).start();

?

//这里实现真正的并发

? ? ? ? Future f1 = Futures.future(new Callable() {

? ? ? ? ? ? Object call() {

? ? ? ? ? ? ? ? def result = worker.sendRequestReply(o)//将消息发给worker actor,让Worker处理业务,同时得到返回结果

? ? ? ? ? ? ? ? worker.stop()

? ? ? ? ? ? ? ? println "Worker Return----" + result

? ? ? ? ? ? ? ? clientChannel.sendOneWay(result)//将结果返回给客户端

? ? ? ? ? ? ? ? return result

? ? ? ? ? ? }

? ? ? ? })

?

? ? ? ? println "Future call over"

? ? }

?

? ? public static void main(String[] args) {//启动Master进程,绑定IP、端口和服务

? ? ? ? Actors.remote().start("10.68.15.113", 9999).register(

? ? ? ? ? ? ? ? "hello-service",

? ? ? ? ? ? ? ? Actors.actorOf(HelloMaster.class));

? ? }

}

?

?

看看客户端的调用日志

请求-----Hello--4

请求-----Hello--1

请求-----Hello--3

请求-----Hello--0

请求-----Hello--2

[GENERIC] [11-10-6 下午9:49] [RemoteClientConnected(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]

[GENERIC] [11-10-6 下午9:49] [RemoteClientStarted(akka.remote.netty.NettyRemoteSupport@195b6aad,/10.68.15.113:9999)]

返回-----result----Hello--0 。

返回-----result----Hello--1 。

返回-----result----Hello--2 。

返回-----result----Hello--4 。

返回-----result----Hello--3 。

?

?

?

服务端的日志:

[GENERIC] [11-10-6 下午9:49] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@5a4fdf11,Some(/10.68.15.113:53462))]

Master接收到Work消息:Hello--1

Future call over

Master接收到Work消息:Hello--2

Future call over

Worker 收到消息----Hello--1

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] started

Worker 收到消息----Hello--2

Master接收到Work消息:Hello--0

Future call over

Master接收到Work消息:Hello--3

Worker 收到消息----Hello--0

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started

Future call over

Master接收到Work消息:Hello--4

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started

Worker 收到消息----Hello--3

Future call over

Worker 收到消息----Hello--4

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-7] [HelloWorker] started

Worker 将消息Hello--1处理完成

Worker 将消息Hello--2处理完成

Worker Return----result----Hello--2 。

Worker Return----result----Hello--1 。

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-13] [HelloWorker] stopping

Worker 将消息Hello--0处理完成

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-3] [HelloWorker] stopping

Worker Return----result----Hello--0 。

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-23] [HelloWorker] stopping

Worker 将消息Hello--4处理完成

Worker 将消息Hello--3处理完成

Worker Return----result----Hello--4 。

Worker Return----result----Hello--3 。

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-11] [HelloWorker] stopping

[DEBUG] ? [11-10-6 下午9:49] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping

?

?

可以从服务端日志看到,Master接收到Work消息后onReceive就结束了(函数最后打印Future call over),一连接收了5个消息,然后Worker才收到消息并处理。最后消息处理完成好后f1的call才收到Worker Return的消息。

这里使用Future实现并发。

?

?

如果不使用Future:

def result = worker.sendRequestReply(o) ? ? ? //将消息发给worker actor

println "Worker Return----" + result

getContext().replyUnsafe(result) ? ? ?// 将worker返回的消息回复给客户端

这就成了同步处理(第一个消息处理完后才接收并处理第二个消息)。

?

?

?

如果在Future后调用了f1.await()或f1.get(),也成同步的了,因为await将等待worker返回后再继续往下执行。 ? ? ? ?

Future f1 = Futures.future(new Callable() {

Object call() {

def result = worker.sendRequestReply(o) ? ? ? //将消息发给worker actor

worker.stop()

println "Worker Return----" + result

clientChannel.sendOneWay(result)

return result

}

})

?

println "Future call over" + f1.get()

?

服务器日志如下:

[GENERIC] [11-10-6 下午10:06] [RemoteServerStarted(akka.remote.netty.NettyRemoteSupport@7e566633)]

[DEBUG] ? [11-10-6 下午10:06] [main] [HelloMaster] started

[GENERIC] [11-10-6 下午10:07] [RemoteServerClientConnected(akka.remote.netty.NettyRemoteSupport@7e566633,Some(/10.68.15.113:53571))]

Master接收到Work消息:Hello--0

[DEBUG] ? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started

Worker 收到消息----Hello--0

Worker 将消息Hello--0处理完成

[DEBUG] ? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-5] [HelloWorker] stopping

Worker Return----result----Hello--0 。

Future call overresult----Hello--0 。

Master接收到Work消息:Hello--2

Worker 收到消息----Hello--2

[DEBUG] ? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started

Worker 将消息Hello--2处理完成

Worker Return----result----Hello--2 。

Future call overresult----Hello--2 。

Master接收到Work消息:Hello--3

[DEBUG] ? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-10] [HelloWorker] stopping

[DEBUG] ? [11-10-6 下午10:07] [akka:event-driven:dispatcher:global-3] [HelloWorker] started

Worker 收到消息----Hello--3

Worker 将消息Hello--3处理完成

Worker Return----result----Hello--3 。

Future call overresult----Hello--3 。

Master接收到Work消息:Hello--4

[DEBUG] ? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-14] [HelloWorker] stopping

[DEBUG] ? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started

Worker 收到消息----Hello--4

Worker 将消息Hello--4处理完成

Worker Return----result----Hello--4 。

Future call overresult----Hello--4 。

Master接收到Work消息:Hello--1

[DEBUG] ? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-18] [HelloWorker] stopping

[DEBUG] ? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-3] [HelloWorker] started

Worker 收到消息----Hello--1

Worker 将消息Hello--1处理完成

Worker Return----result----Hello--1 。

Future call overresult----Hello--1 。

[DEBUG] ? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-21] [HelloWorker] stopping

Master接收到Work消息:Hello--6

[DEBUG] ? [11-10-6 下午10:08] [akka:event-driven:dispatcher:global-24] [HelloWorker] started

Worker 收到消息----Hello--6

Worker 将消息Hello--6处理完成

Worker Return----result----Hello--6 。

Future call overresult----Hello--6 。

Master接收到Work消息:Hello--5

[DEBUG] ? [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-26] [HelloWorker] stopping

Worker 收到消息----Hello--5

[DEBUG] ? [11-10-6 下午10:09] [akka:event-driven:dispatcher:global-24] [HelloWorker] started

?

?

?

需要注意的是,Akka默认使用环境变量%AKKA_HOME%\config\akka.conf配置,默认配置是client的read-timeout = 10(客户端连接10秒后将自动断开,这时服务端再给客户端发消息就发布了了。报RemoteServerWriteFailed异常),可以将值设为0,将一直连着不断开。

actor的timeout默认为5秒,也太短了,延长(不能设为0,0为总是超时).

热点排行