什么时候使用参与者而不是消息传递解决方案,如 WebSphereMQ 或 TibcoRendez?

我已经读过 What design decisions would favour Scala's Actors instead of JMS?的问题和答案了。

通常,我们使用已经存在多年的消息传递解决方案: 使用诸如 WebSphere MQ 或 Apache ActiveMQ 之类的 JMS 实现进行点对点通信,或者使用 Tibco Rendevous 进行多播消息传递。

他们是非常稳定,证明和提供高可用性和性能。然而,配置和设置似乎比在阿卡复杂得多。

在上述产品(WebSphereMQ 或 ActiveMQ)已经成功使用的某些用例中,我何时以及为什么应该使用 Akka?为什么我应该考虑在我未来的项目中使用 Akka 而不是 WebSphereMQ 或 TibcoRV?

我什么时候该避开阿卡?它是否提供与其他解决方案相同的高可用性和性能?或者甚至将 Akka 与其他消息传递中间件进行比较也是一个坏主意?

Maybe there also is another messaging solution in the JVM environment which I should consider besides JMS (Point-to-Point), TibcoRV (Multicast) and Akka?

38663 次浏览

我不是消息系统方面的专家,但是你可以在你的应用程序中把它们和 Akka 结合起来,这样可以两全其美。这里有一个例子,您可能会发现它对于实验 Akka 和消息传递系统很有用,在这个例子中是 ZeroMQ:

Https://github.com/zcox/akka-zeromq-java

首先,“较老的”消息系统(MQ)在实现方面较老,但它们在工程思想方面较新: 事务性持久队列。Scala Actors 和 Akka 可能是一个较新的实现,但是构建在较旧的 Actors 并发模型上。

The two models however end up being very similar in practice because they both are event message based: See my answer to RabbitMQ vs Akka.

如果只为 JVM 编写代码,那么 Akka 可能是一个不错的选择。否则我会使用 RabbitMQ。

另外,如果你是一个 Scala 开发人员,那么 Akka 应该是一个不用动脑筋的人。然而 Akka 的 Java 绑定并不是非常 Java 化,并且由于 Scala 的类型系统需要强制转换。

另外,在 Java 中,人们通常不会创建不可变对象,我建议您在消息传递时这样做。因此,在 Java 中很容易意外地使用 Akka 做一些不能伸缩的事情(对消息使用可变对象,依赖于奇怪的闭包回调状态)。对于 MQ,这不是问题,因为消息总是以牺牲速度为代价进行序列化。对于阿卡,他们通常不会。

与大多数 MQ 相比,Akka 在大量消费者中的扩展性也更好。这是因为对于大多数 MQ (JMS,AMQP)客户机来说,每个队列连接都需要一个线程... 因此,许多队列 = = 许多永久运行的线程。但这主要是客户的问题。我认为 ActiveMQ Apollo 有一个非阻塞调度程序,据称可以为 AMQP 修复这个问题。RabbitMQ 客户端具有允许您组合多个使用者的通道,但是仍然存在大量使用者可能导致死锁或连接死亡的问题,因此通常需要添加更多线程来避免这个问题。

That being said 阿卡在远程控制 is rather new and probably still doesn't offer all the reliable message guarantees and QoS that traditional message queues provide (but that is changing everyday). Its also generally peer-to-peer but does I think support server-to-peer which is generally what most MQ systems do (ie single point of failure) but there are MQ systems that are peer-to-peer (RabbitMQ is server-to-peer).

最后 RabbitMQ 和 Akka 确实是一对好搭档。您可以使用 Akka 作为 RabbitMQ 的包装器,特别是因为 RabbitMQ 不能帮助您处理消息的消耗和在本地(在单个 JVM 中)路由消息。

何时选择阿卡

  • 拥有大量的消费者(想想数百万)。
  • 需要低延迟
  • 打开到 Actor 并发模型

示例系统: 一个交互式实时聊天系统

何时选择 MQ

  • 需要与许多不同的系统(即非 JVM)集成
  • 消息可靠性比延迟更重要
  • 想要更多的工具和管理 UI
  • 因为前面的几点更适合长时间运行的任务
  • 希望使用与 Actors 不同的并发模型

示例系统: 调度的事务性批处理系统

基于相关评论的编辑

我假设 OP 关心的是 阿卡和消息队列都可以处理的分布式处理。我以为他说的是 分发阿卡对于大多数消息队列来说,使用 Akka 进行本地并发是一个很好的比较.我之所以这样说,是因为您可以将消息队列模型作为并发模型(即主题、队列、交换)在本地应用,反应堆库和 简单反应都可以这样做。

Picking the right concurrency model/library is very important for low latency applications. A distributed processing solution such as a message queue is generally not ideal because the routing is almost always done over the wire which is obviously slower than within application and thus Akka would be a superior choice. However I believe some proprietary MQ technologies allow for local routing. Also as I mentioned earlier most MQ clients are pretty stupid about threading and do not rely on non-blocking IO and have a thread per connection/queue/channel... ironically non-blocking io is not always low latency but is generally more resource efficient.

正如你所看到的,分布式编程和并发编程的主题是相当大的,而且每天都在变化,所以我的初衷并不是混淆,而是专注于分布式消息处理的一个特定领域,这正是我认为 OP 所关心的。就并发性而言,人们可能希望将搜索重点放在“反应式”编程(RFP/流)上,这是一种“较新的”但类似于参与者模型和消息队列模型的模型,所有这些模型通常都可以组合在一起,因为它们是基于事件的。

Akka-Camel 是一个比 ZeroMQ 更好的例子-ZeroMQ 是 tcp 到 tcp 的直接通信(因此为零-没有消息队列)。

With AkkaCamel you can abstract away the queue and produce/consume messages direct from an actor without any code to deal with the message queue message pushing/pulling.

You can forego akka-zeromq and use Akka directly with remoting. 我认为 akka-zeromq 正在从核心库中移除,但是我们为 akka 构建了一个很好的 zeromq 库,称为 scala-zeromq (https://github.com/mDialog/scala-zeromq)

Akka 有几个关键的核心用例:

1) Mutable state

通过将共享状态隐藏在参与者中,可以更容易地处理共享状态。当参与者同步处理消息时,您可以在参与者中保持状态,并通过参与者 API 以高一致性公开该字段

2)分发

并发在 akka 中是免费的,所以你说它实际上是解决分发问题。跨机器和核心的分布。Akka 已经建立了“位置透明度”,用于通过电线发送消息。它还具有用于扩展单个服务的集群和相关模式。这使得它成为一个非常好的分布式解决方案(例如微服务架构)

下面是一个将 Akka 与 ActiveMQ 一起用于 Akka-Camel (使用 Java8)的示例

import akka.actor.Props;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.testkit.TestActorRef;
import akka.testkit.TestProbe;
import org.junit.Ignore;
import org.junit.Test;
import akka.camel.javaapi.UntypedProducerActor;
import akka.camel.javaapi.UntypedConsumerActor;
import static com.rogers.totes.TotesTestFixtures.*;
import org.apache.activemq.camel.component.*;


public class MessagingTest {
@Test @Ignore
public void itShouldStoreAMessage() throws Exception{
String amqUrl = "nio://localhost:61616";
Camel camel = (Camel) CamelExtension.apply(system);
camel.context().addComponent("activemq", ActiveMQComponent.activeMQComponent(amqUrl));


TestProbe probe = TestProbe.apply(system);
TestActorRef producer = TestActorRef.create(system, Props.create((Producer.class)));
TestActorRef consumer = TestActorRef.create(system, Props.create((Consumer.class)));
producer.tell("Produce", probe.ref());


Thread.sleep(1000);
}
}


class Producer extends UntypedProducerActor{


@Override
public String getEndpointUri() {
return "activemq:foo.bar";
}
}


class Consumer extends UntypedConsumerActor{


@Override
public String getEndpointUri() {
return "activemq:foo.bar";
}


@Override
public void onReceive(Object message) throws Exception {
System.out.println("GOT A MESSAGE!" + message);


}
}