在 Amazon SQS 中将消息从 DLQ 转移的最佳方法是什么?

在 AmazonSQS 中,将消息从死信队列移回到原始队列的最佳实践是什么?

是吗

  1. 收到 DLQ 的消息
  2. 将消息写入队列
  3. 删除来自 DLQ 的消息

还是有更简单的方法?

此外,AWS 最终是否会在控制台中提供一个工具来将消息从 DLQ 移出?

91609 次浏览

这是你最好的选择。在步骤2之后,您的流程可能会失败。在这种情况下,您最终将复制消息两次,但是您的应用程序无论如何都应该处理消息的重新传递(或者不在乎)。

这里有一个快速的方法。这绝对不是最好的或推荐的选择。

  1. 将主 SQS 队列设置为实际 DLQ 的 DLQ,并将最大接收值设置为1。
  2. 查看 DLQ 中的内容(这将把消息移动到主队列,因为这是实际 DLQ 的 DLQ)
  3. 删除该设置,使主队列不再是实际 DLQ 的 DLQ

这里:

import boto3
import sys
import Queue
import threading


work_queue = Queue.Queue()


sqs = boto3.resource('sqs')


from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)


from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)


def process_queue():
while True:
messages = work_queue.get()


bodies = list()
for i in range(0, len(messages)):
bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})


to_q.send_messages(Entries=bodies)


for message in messages:
print("Coppied " + str(message.body))
message.delete()


for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()


while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=10,
VisibilityTimeout=123,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)


work_queue.join()

不需要移动消息,因为它会带来许多其他挑战,如重复消息、恢复场景、丢失消息、重复消息删除检查等。

这是我们实现的解决方案-

通常,我们使用 DLQ 来处理瞬态错误,而不是永久错误

  1. 像普通队列一样读取来自 DLQ 的消息

    福利
    • 以避免重复消息处理
    • 更好地控制 DLQ ——就像我放了一个检查,只有当常规队列被完全处理时才进行处理。
    • 根据 DLQ 上的消息放大流程
  2. 然后遵循常规队列遵循的相同代码。

  3. 更可靠的情况下中止作业或进程终止处理(例如,实例死亡或进程终止)

    福利
    • 代码可重用性
    • 错误处理
    • 恢复和消息重播
  4. 扩展消息可见性,以便没有其他线程处理它们。

    利益
    • 避免使用多个线程处理相同的记录。
  5. 仅当出现永久性错误或成功时才删除消息。

    利益
    • 继续处理,直到我们得到一个暂时的错误。

还有另一种方法可以实现这一点,而无需编写单行代码。 考虑实际的队列名称是 SQS _ Queue,它的 DLQ 是 SQS _ DLQ。 现在按照以下步骤:

  1. 将 SQS _ Queue 设置为 SQS _ DLQ 的 dlq。因为 SQS _ DLQ 已经是 SQS _ Queue 的 dlq。现在,两者都作为对方的 dlq。
  2. 将 SQS _ DLQ 的最大接收计数设置为1。
  3. 现在从 SQS _ DLQ 控制台读取消息。由于消息接收计数为1,因此它将把所有消息发送到它自己的 dlq,即实际的 SQS _ Queue 队列。

我使用 boto3 lib 编写了一个小的 python 脚本来实现这一点:

conf = {
"sqs-access-key": "",
"sqs-secret-key": "",
"reader-sqs-queue": "",
"writer-sqs-queue": "",
"message-group-id": ""
}


import boto3
client = boto3.client(
'sqs',
aws_access_key_id       = conf.get('sqs-access-key'),
aws_secret_access_key   = conf.get('sqs-secret-key')
)


while True:
messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)


if 'Messages' in messages:
for m in messages['Messages']:
print(m['Body'])
ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
print(ret)
client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
else:
print('Queue is currently empty or messages are invisible')
break

你可以在这个 链接中得到这个脚本

这个脚本基本上可以在任意队列之间移动消息。它支持五十个队列,你可以提供 message_group_id字段。

我们使用以下脚本将消息从 src 队列重新驱动到 tgt 队列:

文件名: redrive.py

用途: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue


The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1.
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.


Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')




def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--src', required=True,
help='Name of source SQS')
parser.add_argument('-t', '--tgt', required=True,
help='Name of targeted SQS')


args = parser.parse_args()
return args




def verify_queue(queue_name):
queue_url = sqs.get_queue_url(QueueName=queue_name)
return True if queue_url.get('QueueUrl') else False




def get_queue_attribute(queue_url):
queue_attributes = sqs.get_queue_attributes(
QueueUrl=queue_url,
AttributeNames=['All'])['Attributes']
print(queue_attributes)


return queue_attributes




def main():
args = parse_args()
for q in [args.src, args.tgt]:
if not verify_queue(q):
print(f"Cannot find {q} in AWS SQS")


src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']


target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
target_queue_attributes = get_queue_attribute(target_queue_url)


# Set the Source Queue's Redrive policy
redrive_policy = {
'deadLetterTargetArn': target_queue_attributes['QueueArn'],
'maxReceiveCount': '1'
}
sqs.set_queue_attributes(
QueueUrl=src_queue_url,
Attributes={
'VisibilityTimeout': '5',
'RedrivePolicy': json.dumps(redrive_policy)
}
)
get_queue_attribute(src_queue_url)


# read all messages
num_received = 0
while True:
try:
resp = sqs.receive_message(
QueueUrl=src_queue_url,
MaxNumberOfMessages=10,
AttributeNames=['All'],
WaitTimeSeconds=5)


num_message = len(resp.get('Messages', []))
if not num_message:
break


num_received += num_message
except Exception:
break
print(f"Redrive {num_received} messages")


# Reset the Source Queue's Redrive policy
sqs.set_queue_attributes(
QueueUrl=src_queue_url,
Attributes={
'VisibilityTimeout': '30',
'RedrivePolicy': ''
}
)
get_queue_attribute(src_queue_url)




if __name__ == "__main__":
main()

只有当原始使用者在多次尝试后未能成功使用消息时,DLQ 才会发挥作用。我们不想删除消息,因为我们相信我们仍然可以做一些事情(可能尝试再次处理或记录它或收集一些统计数据) ,我们不想继续遇到这个消息一次又一次,并停止处理这个消息后面的其他消息的能力。

DLQ 只不过是另一个队列。这意味着我们需要为 DLQ 编写一个消费者,它理想情况下运行频率较低(与原始队列相比) ,从 DLQ 消费并生成消息返回到原始队列,然后从 DLQ 中删除消息——如果这是预期的行为并且我们认为原始消费者现在已经准备好再次处理它的话。如果这个循环持续一段时间应该没问题,因为我们现在还有机会手动检查和进行必要的更改,并在不丢失消息的情况下部署另一个版本的原始消费者(当然是在消息保留期内——默认为4天)。

如果 AWS 现成地提供了这个功能就好了,但是我还没有看到它-他们把这个留给最终用户以他们觉得合适的方式来使用它。

有一些脚本可以为你做到这一点:

# install
npm install replay-aws-dlq;


# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source


# use
sqsmover -s [source_queue_url] -d [dest_queue_url]

AWS Lambda 解决方案为我们工作得很好-

详细说明: Https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:303769779339:applications~aws-sqs-dlq-redriver

https://github.com/honglu/aws-sqs-dlq-redriver.

通过一次点击和另一次点击启动重新驱动部署!

二零二一年十二月一日上,AWS 发布了将消息从 DLQ 重新驱动回源队列(或自定义队列)的功能。

通过将死信队列重新驱动到源队列,可以简化和增强 标准队列的错误处理工作流。

DLQ redrive

来源:

引入 Amazon 简单队列服务死信队列重驱到源队列

下面是将消息从一个 AWS 队列移动到另一个队列的脚本(用 打印稿编写)。也许对某些人有用。


import {
SQSClient,
ReceiveMessageCommand,
DeleteMessageBatchCommand,
SendMessageBatchCommand,
} from '@aws-sdk/client-sqs'


const AWS_REGION = 'eu-west-1'
const AWS_ACCOUNT = '12345678901'


const DLQ = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/dead-letter-queue`
const QUEUE = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/queue`


const loadMessagesFromDLQ = async () => {
const client = new SQSClient({region: AWS_REGION})
const command = new ReceiveMessageCommand({
QueueUrl: DLQ,
MaxNumberOfMessages: 10,
VisibilityTimeout: 60,
})
const response = await client.send(command)


console.log('---------LOAD MESSAGES----------')
console.log(`Loaded: ${response.Messages?.length}`)
console.log(JSON.stringify(response, null, 4))
return response
}


const sendMessagesToQueue = async (entries: Array<{Id: string, MessageBody: string}>) => {
const client = new SQSClient({region: AWS_REGION})
const command = new SendMessageBatchCommand({
QueueUrl: QUEUE,
Entries: entries.map(entry => ({...entry, DelaySeconds: 10})),
// [
// {
//     Id: '',
//     MessageBody: '',
//     DelaySeconds: 10
// }
// ]
})
const response = await client.send(command)
console.log('---------SEND MESSAGES----------')
console.log(`Send: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
console.log(JSON.stringify(response, null, 4))
}


const deleteMessagesFromQueue = async (entries: Array<{Id: string, ReceiptHandle: string}>) => {
const client = new SQSClient({region: AWS_REGION})
const command = new DeleteMessageBatchCommand({
QueueUrl: DLQ,
Entries: entries,
// [
//     {
//         "Id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
//         "ReceiptHandle": "someReceiptHandle"
//     }
// ]
})
const response = await client.send(command)
console.log('---------DELETE MESSAGES----------')
console.log(`Delete: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
console.log(JSON.stringify(response, null, 4))
}


const run = async () => {
const dlqMessageList = await loadMessagesFromDLQ()


if (!dlqMessageList || !dlqMessageList.Messages) {
console.log('There is no messages in DLQ')
return
}


const sendMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, MessageBody: msg.Body}))
const deleteMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle}))


await sendMessagesToQueue(sendMsgList)
await deleteMessagesFromQueue(deleteMsgList)
}


run()




另外,剧本还有改进的余地,但不管怎样,可能还是有用的。