如何连续轮询RabbitMQ以按优先级顺序获取消息? [英] How to poll the RabbitMQ to get messages in order of priority continuously?

查看:97
本文介绍了如何连续轮询RabbitMQ以按优先级顺序获取消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们可以通过安装来自 rabbitmq-priority-queue 使 RabbitMQ 成为分布式优先级队列="nofollow">https://www.rabbitmq.com/community-plugins.html.我将元素推入队列(每个元素都有一个优先级),并且我能够根据需要在消费者中接收队列的内容 - 优先级较高的元素首先出现.

We could make RabbitMQ a distributed priority queue by installing the plugin rabbitmq-priority-queue from https://www.rabbitmq.com/community-plugins.html. I push elements into the queue (each element is pushed with a priority) and I am able to receive the contents of queue in a consumer as desired - higher priority element comes out first.

问题是当这种情况持续发生时优先轮询概念不起作用:

The issue is that the priority polling concept is not working when this happens continuously:

  1. 运行发布者以填充队列中具有不同优先级的 3 个项目.
  2. 消费队列中的消息 - 效果很好 - 按规定消费优先事项.现在消费者等待队列中的任何消息,截至现在队列是空的.
  3. 我再次运行发布者以填充大约 5 个元素.
  4. 消费者不会优先消费队列中的 5 个项目,而是按照第 3 步发布者发布它的顺序消费.

我需要的是在队列的整个内容中具有最大优先级的队列项的每次轮询应该首先出现.

What I need is on every poll of the queue item with maximum priority among the entire contents of queue should come out first.

谁能告诉我这里发生了什么错误?谢谢.

Can anyone tell me what s the bug happening here? Thanks.

这是发布者和消费者(Java)的片段:

Here is the snippet of publisher and consumer (Java):

出版商

public class RabbitMQPublisher {
    private static final String QUEUE = "my-priority-queue-3";
    public static void main(String[] argv) throws Exception {
        final ConnectionFactory factory = new ConnectionFactory();
        final Connection conn = factory.newConnection();
        final Channel ch = conn.createChannel();
        final Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-max-priority", 100);
        ch.queueDeclare(QUEUE, true, false, false, args);
        publish(ch, 24);
        publish(ch, 11);
        publish(ch, 75);
        //second run
        //publish(ch, 27);
        //publish(ch, 77);
        //publish(ch, 12);
        conn.close();
    }

    private static void publish(Channel ch, int priority) throws IOException {
        final BasicProperties props = MessageProperties.PERSISTENT_BASIC.builder().priority(priority).build();
        final String body = "message with priority " + priority;
        ch.basicPublish("", QUEUE, props, body.getBytes());
    }

消费者

while (true) {
        final QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        final String message = new String(delivery.getBody());
        System.out.println(message);
    }

输出:

message with priority 75
message with priority 24
message with priority 11
message with priority 27
message with priority 77
message with priority 12

推荐答案

我能够使用 basicGet 来轮询队列而不是 consumer.nextDelivery() 来解决这个问题.final String message = new String(channel.basicGet(QUEUE_NAME, true).getBody()); 这会从队列中取出具有最高优先级的项目.

I was able to solve this using a basicGet to poll the queue instead of consumer.nextDelivery(). final String message = new String(channel.basicGet(QUEUE_NAME, true).getBody()); This pulls the item with highest priority from the queue.

这篇关于如何连续轮询RabbitMQ以按优先级顺序获取消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆