如何在 RabbitMQ 中设置多次重试尝试?

我正在使用 RabbitMQ,并且我有一个保存电子邮件消息的队列。我的消费者服务取消消息队列并尝试发送它们。如果由于任何原因,我的消费者不能发送消息,我想重新排队再次发送消息。我意识到我可以做一个 basicNack 并设置队列标志为 true,但是,我不想无限期地请求消息(比如,如果我们的电子邮件系统出现故障,我不想继续请求未发送的消息)。我想定义一个有限的次数,我可以重新排队的消息再次发送。但是,当我将电子邮件消息对象排队并发送一个 nack 时,我无法在该对象上设置字段。队列中的消息中不存在已更新的字段。还有别的办法吗?先谢谢你。

80237 次浏览

There are no such feature like retry attempts in RabbitMQ (as well as in AMQP protocol).

Possible solution to implement retry attempts limit behavior:

  1. Redeliver message if it was not previously redelivered (check redelivered parameter on basic.deliver method - your library should have some interface for this) and drop it and then catch in dead letter exchange, then process somehow.

  2. Each time message cannot be processed publish it again but set or increment/decrement header field, say x-redelivered-count (you can chose any name you like, though). To get control over redeliveries in this case you have to check the field you set whether it reaches some limit (top or bottom - 0 is my choise, a-la ttl in ip header from tcp/ip).

  3. Store message unique key (say uuid, but you have to set it manually when you publish message) in Redis, memcache or other storage, even in mysql alongside with redeliveries count and then on each redelivery increment/decrement this value until it reach the limit.

  4. (for real geeks) write plugin that will implement such behavior like you want.

The pro of #3 is that redelivered message stay in queue head. This is important if you have long queue or if message order is important for you (note, that redeliveries will break strict messages order, see official docs for details or this question on SO).

P.S.:

There is similar answer in this topic, but in php. Look through it, maybe it helps you a bit (start reading it from words "There are multiple techniques to deal with cycle redeliver problem".

Although this is an old question I think you can now easily do this with the combination of dead letter exchanges and the x-death header array added once a message is dead lettered:

The dead-lettering process adds an array to the header of each dead-lettered message named x-death. This array contains an entry for each dead lettering event, identified by a pair of {queue, reason}. Each such entry is a table that consists of several fields:

queue: the name of the queue the message was in before it was dead-lettered

reason: reason for dead lettering, see below

time: the date and time the message was dead lettered as a 64-bit AMQP 0-9-1 timestamp

exchange - the exchange the message was published to (note that this will be a dead letter exchange if the message is dead lettered multiple times)

routing-keys: the routing keys (including CC keys but excluding BCC ones) the message was published with

count: how many times this message was dead-lettered in this queue for this reason

original-expiration (if the message was dead-letterered due to per-message TTL): the original expiration property of the message. The expiration property is removed from the message on dead-lettering in order to prevent it from expiring again in any queues it is routed to.

Read this great article for more info

Check this diagram:

enter image description here

From my perspective the better idea here is to implement a combination of dead-letter exchange and retry logic inside of the consumer. If the consumer fails to handle the message then you put a message into a DeadLetterQueue

Bellow you can find a prototype of dead-letter-exchange implemented with node-amqp and Rabbitmq https://github.com/kharandziuk/dead-letter-exchange-prototype.

You should use Quorum queue type. That queue type has a Delivery limit argument to specify the number of retries to deliver a message before deleting it.

Specify next arguments when create your queue:

x-queue-type: quorum
x-delivery-limit: 3 // it means rabbitmq will make 3 attempts to deliver a message before deleting it