Akka2使用探索4(Actors)
Actor模型为编写并发和分布式系统提供了一种更高的抽象级别。它将开发人员从显式地处理锁和线程管理的工作中解脱出来,使编写并发和并行式系统更加容易。
Akka Actor的API与Scala Actor类似,并且从Erlang中借用了一些语法。
Actor类的定义
定义一个Actor类需要继承UntypedActor,并实现onReceive方法。
Props
Props是一个用来在创建actor时指定选项的配置类。 以下是使用如何创建Props实例的示例.
Props props1 = new Props();
Props props2 = new Props(MyUntypedActor.class);
Props props3 = new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
Props props4 = props1.withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
});
使用Props创建Actor
Actor可以通过将 Props 实例传入 actorOf 工厂方法来创建。
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"), "myactor");
使用默认构造函数创建Actors
import akka.actor.ActorRef;import akka.actor.ActorSystem;import akka.actor.Props; ActorSystem system = ActorSystem.create("MySystem"); ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class), "myactor");
actorOf返回ActorRef实例,它是你创建的UntypedActor实例的句柄,可用它与实际的Actor交互。ActorRef是不可变的,它也是可序列化的,可用于网络传输,在远程主机上它仍然代表原节点上的同一个Actor。
也可以通过actor的context上下文来创建,它被创建他的actor监管,系统创建的actor将成为顶级actor。
public class FirstUntypedActor extends UntypedActor {
ActorRef myActor = getContext().actorOf(new Props(MyActor.class), "myactor");
name 参数是可选的, 但建议你为你的actor起一个合适的名字,因为它将在日志信息中被用于标识各个actor. 名字不可以为空,也不能以以$开头。如果给定的名字已经被赋给了同一个父actor的其它子actor,将会抛出InvalidActorNameException 。
Actor 在创建后将自动异步地启动。当你创建UntypedActor时它会自动调用preStart回调方法,你可以重载preStart方法,加入初始化代码。
注意:
使用system.actorOf创建顶级actor是个阻塞操作,有可能发生死锁。避免方法就是不要在actors或futures内部使用默认的dispatcher调用system.actorOf方法。
使用非缺省构造方法创建Actor
如果UntypedActor的构造方法有参数,就不能用actorOf(new Props(clazz))创建了。需要使用new Props(new UntypedActorFactory() {..})创建,例子如下:
// allows passing in arguments to the MyActor constructor
ActorRef myActor = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new MyActor("...");
}
}), "myactor");
UntypedActor API
UntypedActor只定义了一个抽象方法,就是上面提到的onReceive(Objectmessage), 用来实现actor的行为。
如果当前actor的行为与收到的消息不匹配,则会调用unhandled方法, 它的缺省实现是向actor系统的事件流中发布一条 akka.actor.UnhandledMessage(message, sender, recipient)。
另外,它还包括:
getSelf()代表本actor的 ActorRef getSender()代表最近收到的消息的发送actor,通常用于下面将讲到的回应消息中 supervisorStrategy()用户可重写它来定义对子actor的监管策略 getContext()暴露actor和当前消息的上下文信息,如: 用于创建子actor的工厂方法 (actorOf) actor所属的系统 父监管者 所监管的子actor 生命周期监控 hotswap行为栈其余的可见方法是可以被用户重写的生命周期hook。
public void preStart() { }
public void preRestart(Throwable reason, Option message) { for (ActorRef each : getContext().getChildren()) getContext().stop(each); postStop(); }
public void postRestart(Throwable reason) {
preStart();
}
public void postStop() {
}
使用DeathWatch进行生命周期监控
为了能在其它actor结束时(永久终止, 而不是临时的失败和重启)收到通知, actor可以将自己注册为其它actor在终止时所发布的 Terminated 消息的接收者. 这个服务是由actor系统的DeathWatch 组件提供的。
public static class WatchActor extends UntypedActor {
final ActorRef child = this.getContext().actorOf(Props.empty(), "child");
{
this.getContext().watch(child); // <-- this is the only call needed for registration
}
ActorRef lastSender = getContext().system().deadLetters();
@Override
public void onReceive(Object message) {
if (message.equals("kill")) {
getContext().stop(child);
lastSender = getSender();
} else if (message instanceof Terminated) {
final Terminated t = (Terminated) message;
if (t.getActor() == child) {
lastSender.tell("finished");
}
} else {
unhandled(message);
}
}
}
要注意 Terminated 消息的产生与注册和终止行为所发生的顺序无关。多次注册并不表示会有多个消息产生,也不保证有且只有一个这样的消息被接收到:如果被监控的actor已经生成了消息并且已经进入了队列, 在这个消息被处理之前又发生了另一次注册,则会有第二个消息进入队列,因为一个已经终止的actor注册监控器会立刻导致Terminated 消息的发生。
可以使用 context.unwatch(target)来停止对另一个actor的生存状态的监控, 但很明显这不能保证不会接收到Terminated 消息因为该消息可能已经进入了队列。
启动 Hook
actor启动后,它的 preStart 会被立即执行。
重启 Hook
所有的Actor都是被监管的, i.e. 以某种失败处理策略与另一个actor链接在一起。 如果在处理一个消息的时候抛出的异常,Actor将被重启。这个重启过程包括上面提到的Hook:
要被重启的actor的 preRestart 被调用,携带着导致重启的异常以及触发异常的消息; 如果重启并不是因为消息的处理而发生的,所携带的消息为None , 例如,当一个监管者没有处理某个异常继而被它自己的监管者重启时。 这个方法是用来完成清理、准备移交给新的actor实例的最佳位置。 它的缺省实现是终止所有的子actor并调用postStop. 最初 actorOf 调用的工厂方法将被用来创建新的实例。 新的actor的 postRestart 方法被调用,携带着导致重启的异常信息。 By default the preStart is called, just as in the normal start-up case.actor的重启会替换掉原来的actor对象; 重启不影响邮箱的内容, 所以对消息的处理将在 postRestart hook 返回后继续. 触发异常的消息不会被重新接收。在actor重启过程中所有发送到该actor的消息将象平常一样被放进邮箱队列中。
终止 Hook
一个Actor终止后,它的 postStop hook将被调用, 这可以用来取消该actor在其它服务中的注册. 这个hook保证在该actor的消息队列被禁止后才运行, i.e. 之后发给该actor的消息将被重定向到ActorSystem的deadLetters 中。
标识 Actor
每个actor拥有一个唯一的逻辑路径, 此路径是由从actor系统的根开始的父子链构成;它还拥有一个物理路径,如果监管链包含有远程监管者,此路径可能会与逻辑路径不同。这些路径用来在系统中查找actor,例如,当收到一个远程消息时查找收件者, 但是它们的更直接的用处在于:actor可以通过指定绝对或相对路径(逻辑的或物理的)来查找其它的actor并随结果获取一个ActorRef
context.actorFor("/user/serviceA/aggregator") // 查找绝对路径context.actorFor("../joe") // 查找同一父监管者下的兄弟
其中指定的路径被解释为一个 java.net.URI, 它以 / 分隔成路径段. 如果路径以 /开始, 表示一个绝对路径,从根监管者 ("/user"的父亲)开始查找; 否则是从当前actor开始。如果某一个路径段为 .., 会找到当前所遍历到的actor的上一级, 否则则会向下一级寻找具有该名字的子actor。 必须注意的是 actor路径中的.. 总是表示逻辑结构,也就是其监管者。
如果要查找的路径不存在,会返回一个特殊的actor引用,它的行为与actor系统的死信队列类似,但是保留其身份(i.e. 查找路径)。
如果开启了远程调用,则远程actor地址也可以被查找。:
context.actorFor("akka://app@otherhost:1234/user/serviceB")
这些查找动作立即返回一个(可能是远程的)actor引用, 所以你必须向它发送一个消息并等待其响应,来确认serviceB 是真正可访问和运行的。
发送消息
向actor发送消息是使用下列方法之一。
tell 意思是“fire-and-forget”, e.g. 异步发送一个消息并立即返回。这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。每一个消息发送者分别保证自己的消息的次序. try {
String result = operation();
getSender().tell(result);
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e));
throw e;
}
ask使用方式如下:
List<Future<Object>> futures = [] AkkaClientNoReply client = new AkkaClientNoReply("akka://xw@127.0.0.1:8888/user/server") client.send("hello") 0.upto(15) { futures << akka.pattern.Patterns.ask(client.akkaClient, it, 1000 * 60)
//模拟客户端给服务端发0——15消息,服务器处理(把数值+1返回给客户端) } final Future<Iterable<Object>> aggregate = Futures.sequence(futures, client.system.dispatcher()); final Future<Integer> transformed = aggregate.map(new Mapper<Iterable<Object>, Integer>() { public Integer apply(Iterable<Object> coll) { final Iterator<Object> it = coll.iterator(); int count = 0; while (it.hasNext()) { int x = (Integer) it.next(); count = count + x } return new Integer(count); } }); AkkaServerApp app = new AkkaServerApp("resultHandler", "127.0.0.1", 6666, "result") app.messageProcessor = {msg, UntypedActorContext context -> log.info("1到16之和为" + msg) } app.startup() akka.pattern.Patterns.pipe(transformed).to(app.serverActor)
如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。
Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。
警告
在使用future回调如 onComplete, onSuccess, and onFailure时, 在actor内部你要小心避免捕捉该actor的引用, i.e. 不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。
转发消息
你可以将消息从一个actor转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的actor时会很有用。
myActor.forward(message, getContext());
回应消息
getSender().tell(replyMsg)
如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为“dead-letter” actor的引用.
初始化接收消息超时
设置receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}
终止Actor
通过调用ActorRefFactory i.e. ActorContext 或 ActorSystem 的stop 方法来终止一个actor , 通常 context 用来终止子actor,而 system 用来终止顶级actor. 实际的终止操作是异步执行的, i.e.stop 可能在actor被终止之前返回。
如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem 的死信, 但是这取决于邮箱的实现。
actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己 (调用postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated, 通知其监管者). 这个过程保证actor系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应 (i.e. 由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。
在 ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。
postStop hook 是在actor被完全终止以后调用。
PoisonPill
你也可以向actor发送 akka.actor.PoisonPill 消息, 这个消息处理完成后actor会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。
优雅地终止
如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:
try {
Future<Boolean> stopped = akka.pattern.Patterns.gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (ActorTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
热拔插 Become/Unbecome
升级 Upgrade
Akka支持在运行时对Actor消息循环 (e.g. 的实现)进行实时替换: 在actor中调用 context.become 方法。
Become 要求一个 akka.japi.Procedure 参数作为新的消息处理实现。 被替换的代码被存在一个栈中,可以被push和pop。
降级
由于被热替换掉的代码存在栈中,你也可以对代码进行降级,只需要在actor中调用 context.unbecome 方法。
Killing actor
发送Kill消息给actor
Actor 与 异常
在消息被actor处理的过程中可能会抛出异常,例如数据库异常。
消息会怎样如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。
邮箱会怎样如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。
actor会怎样如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。