RabbitMQ / AMQP:单队列,同一消息的多个消费者?

我刚刚开始使用RabbitMQ和AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用同样的信息做不同的事情。

大多数RabbitMQ文档似乎都关注于循环,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼所见的行为。

举个例子:生产者只有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;


connection.on('ready', function () {
var sendMessage = function(connection, queue_name, payload) {
var encoded_payload = JSON.stringify(payload);
connection.publish(queue_name, encoded_payload);
}


setInterval( function() {
var test_message = 'TEST '+count
sendMessage(connection, "my_queue_name", test_message)
count += 1;
}, 2000)




})

这是一位消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
connection.queue("my_queue_name", function(queue){
queue.bind('#');
queue.subscribe(function (message) {
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
})

如果我启动消费者两次,我可以看到每个消费者都在循环行为中使用备用消息。例如,我将在一个终端上看到消息1,3,5,在另一个终端上看到消息2,4,6

我的问题是:

  • 我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的?

  • 这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?

205424 次浏览

是的,每个消费者都可以收到相同的消息。看一看 http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html < / p >

用于路由消息的不同方法。我知道他们是python和java,但它很好地理解原则,决定你在做什么,然后找到如何在JS中做到这一点。听起来好像你想要做一个简单的扇出(教程3),它将消息发送到连接到交换机的所有队列。

你正在做的和你想做的之间的区别基本上是你要设置和交换或输入扇出。扇出交换机将所有消息发送到所有连接的队列。每个队列都有一个消费者,该消费者可以单独访问所有消息。

是的,这是很常见的,这是AMPQ的特点之一。

发送模式是一对一的关系。如果你想“发送”给多个接收者,你应该使用“发布/订阅”模式。更多细节参见http://www.rabbitmq.com/tutorials/tutorial-three-python.html

只要读rabbitmq教程。你发布消息是为了交换,而不是为了排队;然后将其路由到适当的队列。在本例中,您应该为每个消费者绑定单独的队列。这样,它们就可以完全独立地使用消息。

我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的?

不,如果消费者在同一个队列上,就不会。来自RabbitMQ的AMQP概念指南:

重要的是要理解,在AMQP 0-9-1中,消息在消费者之间是负载均衡的。

这似乎意味着队列中的循环行为是给定的,并且不可配置。也就是说,为了让多个使用者处理相同的消息ID,需要单独的队列。

这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?

不,它不是,单个队列/多个消费者,每个消费者处理相同的消息ID是不可能的。让交换器将消息路由到两个单独的队列确实更好。

由于我不需要太复杂的路由,展开交流将很好地处理这个问题。我之前没有过多关注exchange,因为node-amqp具有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数AMQP消息都发布到特定的交换。

以下是我的扇出交换,包括发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;


connection.on('ready', function () {
connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {
 

var sendMessage = function(exchange, payload) {
console.log('about to publish')
var encoded_payload = JSON.stringify(payload);
exchange.publish('', encoded_payload, {})
}


// Recieve messages
connection.queue("my_queue_name", function(queue){
console.log('Created queue')
queue.bind(exchange, '');
queue.subscribe(function (message) {
console.log('subscribed to queue')
var encoded_payload = unescape(message.data)
var payload = JSON.parse(encoded_payload)
console.log('Recieved a message:')
console.log(payload)
})
})
  

setInterval( function() {
var test_message = 'TEST '+count
sendMessage(exchange, test_message)
count += 1;
}, 2000)
})
})

要获得您想要的行为,只需让每个消费者从自己的队列中消费。您必须使用非直接交换类型(主题、报头、扇出),以便将消息一次性发送到所有队列。

RabbitMQ / AMQP:单队列,同一消息和页面刷新的多个消费者。

rabbit.on('ready', function () {    });
sockjs_chat.on('connection', function (conn) {


conn.on('data', function (message) {
try {
var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));


if (obj.header == "register") {


// Connect to RabbitMQ
try {
conn.exchange = rabbit.exchange(exchange, { type: 'topic',
autoDelete: false,
durable: false,
exclusive: false,
confirm: true
});


conn.q = rabbit.queue('my-queue-'+obj.agentID, {
durable: false,
autoDelete: false,
exclusive: false
}, function () {
conn.channel = 'my-queue-'+obj.agentID;
conn.q.bind(conn.exchange, conn.channel);


conn.q.subscribe(function (message) {
console.log("[MSG] ---> " + JSON.stringify(message));
conn.write(JSON.stringify(message) + "\n");
}).addCallback(function(ok) {
ctag[conn.channel] = ok.consumerTag; });
});
} catch (err) {
console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
}


} else if (obj.header == "typing") {


var reply = {
type: 'chatMsg',
msg: utils.escp(obj.msga),
visitorNick: obj.channel,
customField1: '',
time: utils.getDateTime(),
channel: obj.channel
};


conn.exchange.publish('my-queue-'+obj.agentID, reply);
}


} catch (err) {
console.log("ERROR ----> " + err.stack);
}
});


// When the visitor closes or reloads a page we need to unbind from RabbitMQ?
conn.on('close', function () {
try {


// Close the socket
conn.close();


// Close RabbitMQ
conn.q.unsubscribe(ctag[conn.channel]);


} catch (er) {
console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
}
});
});

据我评估,你的情况是

  • 我有一个消息队列(您接收消息的源,让它命名为q111)

  • 我有多个消费者,我希望用相同的消息做不同的事情。

这里的问题是,当这个队列接收到3个消息时,消息1被消费者a消费,其他消费者B和C消费消息2和3。当你需要一个设置,让rabbitmq同时将这三个消息(1,2,3)的相同副本传递给所有连接的消费者(a,B,C)。

虽然可以通过许多配置来实现这一点,但简单的方法是使用以下两步概念:

  • 使用动态rabbitmq-shovel从所需队列(q111)提取消息并发布到扇出交换机(专门为此目的创建和专用的交换机)。
  • 现在重新配置你的消费者A,B &C(谁正在收听队列(q111)),从这个Fanout交换直接使用独家&每个消费者的匿名队列。

注意:当使用这个概念时,不要直接从源队列(q111)消费,因为已经消费的消息不会被移到你的Fanout交换机。

如果您认为这不能满足您的具体要求……欢迎发表你的建议:-)

如果你碰巧像我一样使用amqplib库,他们有一个发布/订阅RabbitMQ教程实现的方便的例子,你可能会发现这很方便。

最后几个答案几乎是正确的——我有大量的应用程序,它们生成的消息需要最终传递给不同的消费者,所以这个过程非常简单。

如果希望同一消息有多个使用者,请执行以下步骤。

创建多个队列,用于接收消息的每个应用程序,在每个队列属性中,用amq“绑定”一个路由标记。直接交换。更改您的发布应用程序发送到amq。直接使用路由标记(不是队列)。然后AMQP将消息复制到具有相同绑定的每个队列中。工作就像一个魅力:)

例子:假设我有一个我生成的JSON字符串,我将它发布到“amq。使用路由标签“new-sales-order”直接交换,我有一个order_printer应用程序打印订单的队列,我有一个计费系统的队列,它将发送订单的副本和发票给客户端,我有一个web存档系统,我存档订单的历史/合规原因,我有一个客户端web界面,订单被跟踪,因为其他信息来自订单。

所以我的队列是:order_printer, order_billing, order_archive和order_tracking 都有绑定标签“new-sales-order”绑定到它们,这4个都将得到JSON数据

这是一种理想的发送数据的方式,而发布应用程序不知道或关心接收应用程序。

我认为你应该检查使用扇出交换器发送消息。这样你就会收到不同消费者的相同消息,在表下面,RabbitMQ正在为每个新的消费者/订阅者创建不同的队列。

这是javascript教程示例的链接 https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html < / p >

在这种情况下,有一个有趣的选项,我在这里没有找到答案。

您可以在一个消费者中使用“requeue”特性Nack消息,以在另一个消费者中处理它们。 一般来说,这不是一种正确的方式,但也许它对某人来说已经足够好了。< / p >

https://www.rabbitmq.com/nack.html

注意循环(当所有消费者nack+requeue消息时)!

显然你想要的是扇形散开。fanout

读rabbitMQ教程: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html < / p >

以下是我的例子:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
if (error0) {
throw error0;
}
console.log('RabbitMQ connected')
try {
// Create exchange for queues
channel = await connection.createChannel()
await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
} catch(error) {
console.error(error)
}
})

Subscriber.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
if (error0) {
throw error0;
}
console.log('RabbitMQ connected')
try {
// Create/Bind a consumer queue for an exchange broker
channel = await connection.createChannel()
await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
const queue = await channel.assertQueue('', {exclusive: true})
channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')


console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
channel.consume('', consumeMessage, {noAck: true});
} catch(error) {
console.error(error)
}
});
这是我在网上找到的一个例子。也许还能帮上忙。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange < / p >

您只需要为消费者分配不同的组。