RabbitMQ/AMQP:单个队列,同一消息的多个消费者? [英] RabbitMQ / AMQP: single queue, multiple consumers for same message?

查看:60
本文介绍了RabbitMQ/AMQP:单个队列,同一消息的多个消费者?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用 RabbitMQ 和 AMQP.

I am just starting to use RabbitMQ and AMQP in general.

  • 我有一个消息队列
  • 我有多个消费者,我想用相同的消息做不同的事情.
  • I have a queue of messages
  • I have multiple consumers, which I would like to do different things with the same message.

大多数 RabbitMQ 文档似乎都集中在轮询机制上,即单个消息由单个消费者消费,负载分布在每个消费者之间.这确实是我亲眼所见的行为.

Most of the RabbitMQ documentation seems to be focused on round-robin, ie where a single message is consumed by a single consumer, with the load being spread between each consumer. This is indeed the behavior I witness.

一个例子:生产者有一个队列,每 2 秒发送一次消息:

An example: the producer has a single queue, and send messages every 2 sec:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环使用替代消息.例如,我会在一个终端中看到消息 1、3、5,在另一个终端中看到消息 2、4、6.

If I start the consumer twice, I can see that each consumer is consuming alternate messages in round-robin behavior. Eg, I'll see messages 1, 3, 5 in one terminal, 2, 4, 6 in the other.

我的问题是:

  • 我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?它通常是如何配置的?

  • Can I have each consumer receive the same messages? Ie, both consumers get message 1, 2, 3, 4, 5, 6? What is this called in AMQP/RabbitMQ speak? How is it normally configured?

这是常见的做法吗?我应该让交换将消息路由到两个单独的队列中,而不是一个消费者吗?

Is this commonly done? Should I just have the exchange route the message into two separate queues, with a single consumer, instead?

推荐答案

我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?它通常是如何配置的?

不,如果消费者在同一个队列中,则不会.来自 RabbitMQ 的 AMQP 概念 指南:

No, not if the consumers are on the same queue. From RabbitMQ's AMQP Concepts guide:

重要的是要了解,在 AMQP 0-9-1 中,消息在使用者之间进行负载平衡.

it is important to understand that, in AMQP 0-9-1, messages are load balanced between consumers.

这似乎意味着队列中的循环行为是给定的,并且不可配置.即,为了让多个消费者处理相同的消息 ID,需要单独的队列.

This seems to imply that round-robin behavior within a queue is a given, and not configurable. Ie, separate queues are required in order to have the same message ID be handled by multiple consumers.

这是常见的做法吗?我应该让交换器将消息路由到两个单独的队列中,而不是一个消费者吗?

不,不是,不可能有单个队列/多个消费者,每个消费者都处理相同的消息 ID.让交换器将消息路由到两个单独的队列中确实更好.

No it's not, single queue/multiple consumers with each each consumer handling the same message ID isn't possible. Having the exchange route the message onto into two separate queues is indeed better.

因为我不需要太复杂的路由,扇出交换可以很好地处理这个问题.我之前并没有过多关注交换,因为 node-amqp 具有默认交换"的概念,允许您直接将消息发布到连接,但是大多数 AMQP 消息都发布到特定的交换.

As I don't require too complex routing, a fanout exchange will handle this nicely. I didn't focus too much on Exchanges earlier as node-amqp has the concept of a 'default exchange' allowing you to publish messages to a connection directly, however most AMQP messages are published to a specific exchange.

这是我的扇出交换,包括发送和接收:

Here's my fanout exchange, both sending and receiving:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

这篇关于RabbitMQ/AMQP:单个队列,同一消息的多个消费者?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆