使用DefaultConsumer和QueueingConsumer的RabbitMQ Java客户端 [英] RabbitMQ Java Client Using DefaultConsumer vs QueueingConsumer

查看:931
本文介绍了使用DefaultConsumer和QueueingConsumer的RabbitMQ Java客户端的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  1. DefaultConsumer
    我的DemoConsumer继承自DefaultConsumer.
    我注意到以这种方式从ThreadPool调用handleDelivery().
    (打印Thread.currentThread().getName()每次都看到pool-1-thread-1/2/3/4.
    我也对其进行了几次测试,发现订单已保存.
    只是要确保-由于不同的线程调用句柄传递-会弄乱我的订单吗?

  1. DefaultConsumer
    My DemoConsumer inherits from DefaultConsumer.
    I have noticed that working this way handleDelivery() is invoked from ThreadPool.
    (printing Thread.currentThread().getName() I see pool-1-thread-1/2/3/4 eachtime.
    I have also tested it several times and saw that the order is saved.
    Just to make sure - since different threads call handle delivery - will it mess my order?

QueueingConsumer
所有Java教程都使用QueueingConsumer来消费消息.
在API文档中,它被提为不推荐使用的类.
我应该更改我的代码以从DefaultConsumer继承使用它吗?教程过时了吗?

QueueingConsumer
All of the java tutorial use QueueingConsumer to consume messages.
In the API Docs it is mentioned as a deprecated class.
Should I change my code to inherit from DefaultConsumer use it? Is the tutorial outdated?

谢谢.

推荐答案

是的,DefaultConsumer使用可以更改的内部线程池. 将ExecutorService用作:

Yes,DefaultConsumer uses an internal thread pool that can be changed. Using ExecutorService as:

ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);

阅读 http://www.rabbitmq.com/api-guide.html高级连接选项".

您可以从"QueueingConsumer"中阅读

As you can read from the "QueueingConsumer" doc:

因此,现在可以直接实现Consumer或扩展DefaultConsumer.

As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.

我从未使用过QueueingConsumer,因为它不是由事件驱动的.

I never used QueueingConsumer, because it isn’t properly event-driven.

如您所见:

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    /// here you are blocked, waiting the next message.
    String message = new String(delivery.getBody());
}

在这种情况下,典型的问题是如何关闭订阅,一种常见的解决方法是在本地主机中发送带标签的关闭消息.其实我不太喜欢.

A typical problem in this case is how to close the subscription, and a common workaround is to send a tagged close message in local host. Actually I don’t like it so much.

如果您扩展DefaultConsumer,则可以正确关闭订阅和频道:

If you extend DefaultConsumer instead, you can correctly close the subscription and the channel:

public class MyConsumer extends DefaultConsumer {...}

然后

public static void main(String[] args) {
MyConsumer consumer = new MyConsumer (channel);
String consumerTag = channel.basicConsume(Constants.queue, false, consumer);
System.out.println("press any key to terminate");
System.in.read();
channel.basicCancel(consumerTag);
channel.close();
....

最后,您不必担心消息顺序,因为如果一切正常,则消息顺序是正确的,但是我认为您无法假设它,因为如果出现问题,您可能会丢失消息顺序.如果您绝对需要维护消息顺序,则应包括一个顺序标记以在消费者端重建消息顺序.

In conclusion, you shouldn’t worry about the message order because if all works correctly, the message order is correct, but I think you can’t assume it because if there is some problem, you can lose the message order. If you absolutely need to maintain message order, you should include a sequential tag to reconstruct the message order at the consumer side.

并且您应该扩展DefaultConsumer.

And you should extend DefaultConsumer.

这篇关于使用DefaultConsumer和QueueingConsumer的RabbitMQ Java客户端的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆