RabbitMQ: 带有主题交换的持久消息

我是 RabbitMQ 的新手。

我已经建立了一个“话题”交流。消费者可以在发布者之后开始。我希望消费者能够接收在它们启动之前已经发送的消息,而这些消息还没有被消费。

交换机的设置参数如下:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

这些消息使用以下参数发布:

delivery_mode => 2

消费者使用 get ()从交换中检索消息。

不幸的是,在任何客户端启动之前发布的任何消息都会丢失。

我想我的问题是,交流不持有消息。也许我需要在发布者和消费者之间建立一个队列。但是这似乎不适用于通过密钥路由消息的“主题”交换。

我应该如何继续? 我使用 Perl绑定 Net::RabbitMQ(应该无关紧要)和 RabbitMQ 2.2.0

27684 次浏览

如果在发布消息时没有可用的连接使用者来处理消息,则需要一个持久队列来存储消息。

交换器不存储消息,但队列可以。令人困惑的是,交易所可以被标记为“持久的”,但所有这一切真正意味着,即使你重新启动代理,交换自己仍将存在,但 没有确实意味着发送到该交易所的任何消息都会自动持久化。

鉴于此,有两种选择:

  1. 在启动发布者以自己创建队列之前执行 一个行政步骤。您可以使用 WebUI 或命令行工具来完成这项工作。确保将其创建为持久队列,这样即使没有活动使用者,它也会存储路由到它的任何消息。
  2. 假设您的使用者被编码为在启动时始终声明(因此自动创建)他们的交换器和队列(并且他们声明它们是持久的) ,那么在启动任何发布者之前只需要声明 至少运行一次你所有的消费者。这将确保正确创建所有队列。然后可以关闭使用者,直到真正需要它们为止,因为队列将持久地存储路由到它们的任何未来消息。

我要1英镑。可能需要执行的步骤不多,您总是可以编写所需的步骤脚本,以便可以重复执行。另外,如果所有使用者都从同一个队列中提取(而不是每个队列都有一个专用队列) ,那么实际上只需要很少的管理开销。

队列是需要正确管理和控制的东西。否则,您最终可能会遇到流氓消费者声明持久队列,使用它们几分钟,但再也不会使用它们。不久之后,你就会有一个永久性增长的队列,没有任何东西减少它的规模,以及一个即将到来的经纪人启示录。

正如 Brian 提到的,交换器不存储消息,主要负责将消息路由到另一个交换器/s 或队列/s。如果交换器没有绑定到队列,那么发送到该交换器的所有消息都将“丢失”。

您不应该需要在发布者脚本中声明固定的客户端队列,因为这可能是不可伸缩的。队列可以由发布者动态创建,并在内部使用交换到交换绑定进行路由。

RabbitMQ 支持交换到交换绑定,这将允许拓扑灵活性、解耦和其他好处。你可以在 到交换绑定的 RabbitMQ 交换[ AMPQ ]了解更多信息

到 Exchange 绑定的 RabbitMQ 交换

Example Topology

示例 Python 代码,如果没有使用队列的使用者,则使用持久性创建交换到交换绑定。

#!/usr/bin/env python
import pika
import sys
 

 

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
 

 

#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
 

#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
 

#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
 

##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
 

#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')

您的情况似乎是“消息持久性”。

RabbitMQ 教程文档开始,您需要将 queuemessages都标记为持久的(下面的代码为 C # 版本)。对于其他语言,您可以选择 给你)。

  1. 首先,在 出版商中,您需要确保 queueRabbitMQ节点重新启动后仍然存在。为了做到这一点,我们需要宣布它是持久的:
channel.QueueDeclare(queue: "hello",
durable: true,
....);
  1. 其次,在 消费者,你需要标记你的 持久性讯息-设置 IBasicProperties.SetPersistent为真。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;