如果抛出异常,Akka Actor 不终止

我目前正试图开始与阿卡,我正面临着一个奇怪的问题。我为我的演员准备了以下代码:

class AkkaWorkerFT extends Actor {
def receive = {
case Work(n, c) if n < 0 => throw new Exception("Negative number")
case Work(n, c) => self reply n.isProbablePrime(c);
}
}

这就是我如何开始我的工人:

val workers = Vector.fill(nrOfWorkers)(actorOf[AkkaWorkerFT].start());
val router = Routing.loadBalancerActor(SmallestMailboxFirstIterator(workers)).start()

这就是我关闭一切的方法:

futures.foreach( _.await )
router ! Broadcast(PoisonPill)
router ! PoisonPill

现在发生的情况是,如果我用 n > 0发送 worker 消息(没有抛出异常) ,一切都正常运行,应用程序正常关闭。但是,只要我向它发送一条导致异常的消息,应用程序就不会终止,因为仍然有一个参与者在运行,但是我不知道它来自哪里。

如果有帮助的话,下面是所讨论的线程堆栈:

  Thread [akka:event-driven:dispatcher:event:handler-6] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: 158
AbstractQueuedSynchronizer$ConditionObject.await() line: 1987
LinkedBlockingQueue<E>.take() line: 399
ThreadPoolExecutor.getTask() line: 947
ThreadPoolExecutor$Worker.run() line: 907
MonitorableThread(Thread).run() line: 680
MonitorableThread.run() line: 182

PS: 没有终止的线程不是任何工作线程,因为我已经添加了一个 postStop 回调,它们中的每一个都正确地停止了。

PPS: Actors.registry.shutdownAll解决了这个问题,但我认为关闭所有应该只作为最后的手段,不是吗?

8947 次浏览

akka.conf中打开日志记录器

处理 akka 参与者内部问题的正确方法不是抛出异常,而是设置管理者层次结构

”在并发代码中引发异常(假设我们使用 (非链接的参与者) ,只是简单地打破了线程,目前 执行演员。

没有办法发现事情出了差错(除了 检查堆栈跟踪)。 你对此无能为力。”

通过主管层次结构实现容错(1.2)

* 注意 * 对于旧版本的 Akka (1.2) ,上面的内容是正确的 在较新的版本(例如2.2)中,您仍然需要设置一个监督层次结构,但它会捕获子进程抛出的异常。例如:。

class Child extends Actor {
var state = 0
def receive = {
case ex: Exception ⇒ throw ex
case x: Int        ⇒ state = x
case "get"         ⇒ sender ! state
}
}

还有主管:

class Supervisor extends Actor {
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._


override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException      ⇒ Resume
case _: NullPointerException     ⇒ Restart
case _: IllegalArgumentException ⇒ Stop
case _: Exception                ⇒ Escalate
}


def receive = {
case p: Props ⇒ sender ! context.actorOf(p)
}
}

通过主管层次结构实现容错(2.2)

正如维克多所建议的那样,关闭日志记录以确保终止日志记录有点奇怪。你可以做的是:

EventHandler.shutdown()

它干净利落地关闭了所有(logger)侦听器,这些侦听器在异常之后维持整个世界的运行:

def shutdown() {
foreachListener(_.stop())
EventHandlerDispatcher.shutdown()
}