RabbitMQ-消息传递顺序

我需要为我的新项目选择一个新的队列代理。

这一次,我需要一个支持发布/订阅的可伸缩队列,并且必须保持消息顺序。

我读了亚历克西斯的评论,他写道:

“事实上,我们认为 RabbitMQ 提供了比卡夫卡更强大的订单”

我读了 rabbitmq 文档中的消息订购部分:

”可以使用 AMQP 方法将消息返回到队列 排队 参数(basic.Recovery、 basic.dep 和 basic.nack) ,或者由于通道 在2.7.0版本和更高版本中关闭,同时保存未确认的消息 个别使用者仍然可以从 如果队列有多个订阅方,则按顺序排列 可能请求消息的其他订阅方 信息总是按照发布顺序保存。”

如果我需要按照消息的顺序处理消息,我只能使用 rabbitMQ 和一个独占队列来处理每个消费者?

RabbitMQ 仍然被认为是有序消息排队的好解决方案吗?

70206 次浏览

好吧,让我们仔细看看你上面描述的场景。我认为重要的是要在提问的代码片段之前立即粘贴 文件,以提供上下文:

AMQP 0-9-1核心规范的4.7节解释了 保证排序的条件: 一个通道,通过一个交换器和一个队列和一个 传出信道将按照与它们相同的顺序接收 RabbitMQ 自2.7.0发布以来提供了更强的保证。

可以使用具有以下特性的 AMQP 方法将消息返回到队列 一个队列参数(basic.Recovery、 basic.return 和 basic.nack) ,或者 由于持有未确认信息的频道关闭 这些场景导致消息在 RabbitMQ 版本早于2.7.0的队列 2.7.0,消息总是按照发布顺序保存在队列中,即使在排队或通道关闭的情况下。(强调添加)

因此,很明显,从2.7.0开始,RabbitMQ 在消息排序方面比原来的 AMQP 规范有了相当大的改进。

对于多个(并行)消费者,无法保证处理顺序。
第三段(粘贴在问题中)接着给出了一个免责声明,我将对其进行解释: “如果队列中有多个处理器,则不再保证消息将按顺序处理。”他们只是说 RabbitMQ 不能违背数学定律。

考虑一下银行的客户队伍。这家银行以按照客户进入银行的先后顺序帮助客户而自豪。顾客排成一队,由3个可用的出纳员中的下一个服务。

今天早上,碰巧三个出纳员同时都有空,接下来的三个顾客走了过来。突然,三个出纳员中的第一个病得很厉害,无法为排队的第一个顾客服务。当这种情况发生时,出纳员2已经完成了与客户2和出纳员3已经开始服务客户3。

现在,只有两种可能。(1)排队的第一个顾客可以回到队伍的头部,或者(2)第一个顾客可以抢先第三个顾客,导致出纳员停止对第三个顾客的工作,开始对第一个顾客的工作。RabbitMQ 和我知道的其他消息代理都不支持这种类型的抢占逻辑。在任何一种情况下,第一个顾客实际上都不会首先得到帮助——第二个顾客会得到帮助,他们很幸运,一下子就找到了一个优秀、快速的出纳员。保证客户得到有序帮助的唯一方法是让一个出纳员一次帮助一个客户,这将给银行带来重大客户服务问题。

我希望这有助于说明你所问的问题。考虑到您有多个使用者,因此不可能在每种可能的情况下都确保按顺序处理消息。如果您有多个队列、多个独占使用者、不同的代理等等,这并不重要——没有办法保证 先验的能够按照多个使用者的顺序应答消息。但是 RabbitMQ 会尽最大的努力。

Message ordering is preserved in Kafka, but only within partitions rather than globally. If your data need both global ordering and partitions, this does make things difficult. However, if you just need to make sure that all of the same events for the same user, etc... end up in the same partition so that they are properly ordered, you may do so. The producer is in charge of the partition that they write to, so if you are able to logically partition your data this may be preferable.

我认为在这个问题中有两件事情是不相似的,消费秩序和加工秩序。

在某种程度上,消息队列可以保证消息按顺序被消费,但它们不能保证消息处理的顺序。

这里的主要区别在于,消息处理的某些方面无法在消费时确定,例如:

  • 正如前面提到的,消费者在处理时可能会失败,这里消息的消费顺序是正确的,然而,消费者未能正确处理它,这将使它返回到队列,直到现在消费顺序仍然完好无损,但我们不知道现在的处理顺序如何

  • 如果我们所说的“处理”是指消息现在被丢弃并完全处理完毕,那么考虑一下处理时间不是线性的情况,换句话说,处理一条消息比处理另一条消息花费的时间更长,因此如果消息3的处理时间比预期的长,那么消息4和消息5可能会被消耗并在消息3之前完成处理

因此,即使您设法将消息返回到队列的前端(顺便说一下,这违反了消耗顺序) ,您仍然不能保证在下一条消息之前的所有消息都已经完成处理。

如果你想确保加工顺序,那么:

  1. 在任何时候只有一个使用者实例
  2. 或者不要使用消息队列,而是使用同步阻塞方法进行处理,这听起来可能很糟糕,但在许多情况下,业务需求是完全有效的,有时甚至是关键的

有一些适当的方法可以保证 RabbitMQ 订阅中消息的顺序。

如果使用多个使用者,它们将使用共享的 ExecutorService处理消息。参见 ConnectionFactory.setSharedExecutor(...)。你可以设置 Executors.newSingleThreadExecutor()

如果对单个队列使用一个 Consumer,则可以使用多个 绑定钥匙绑定该队列(它们可能有通配符)。消息将按照消息代理接收到消息的顺序放入队列中。

例如,您有一个发布者,它在订单重要的地方发布消息:

try (Connection connection2 = factory.newConnection();
Channel channel2 = connection.createChannel()) {
// publish messages alternating to two different topics
for (int i = 0; i < messageCount; i++) {
final String routingKey = i % 2 == 0 ? routingEven : routingOdd;
channel2.basicPublish(exchange, routingKey, null, ("Hello" + i).getBytes(UTF_8));
}
}

现在,您可能希望按照发布消息的相同顺序以 排队的形式接收来自两个 主题的消息:

// declare a queue for the consumer
final String queueName = channel.queueDeclare().getQueue();


// we bind to queue with the two different routingKeys
final String routingEven = "even";
final String routingOdd = "odd";
channel.queueBind(queueName, exchange, routingEven);
channel.queueBind(queueName, exchange, routingOdd);
channel.basicConsume(queueName, true, new DefaultConsumer(channel) { ... });

Consumer现在将按照发布消息的顺序接收消息,而不管您使用的是不同的 主题

RabbitMQ 文档中有一些很好的5分钟教程,可能会有所帮助: Https://www.rabbitmq.com/tutorials/tutorial-five-java.html