RabbitMQ:快速的生产者和缓慢的消费者 [英] RabbitMQ: fast producer and slow consumer

查看:802
本文介绍了RabbitMQ:快速的生产者和缓慢的消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序使用RabbitMQ作为消息队列在两个组件之间发送/接收消息:发送方和接收方。发件人以非常快的方式发送消息。接收器接收消息然后执行一些非常耗时的任务(主要是针对非常大的数据大小的数据库写入)。由于接收器需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送方将继续快速填满队列。所以我的问题是:这会导致消息队列溢出吗?

I have an application that uses RabbitMQ as the message queue to send/receive message between two components: sender and receiver. The sender sends message in a very fast way. The receiver receives the message and then does some very time-consuming task (mainly database writing for very large data size). Since the receiver takes a very long time to finish the task and then retrieve the next message in the queue, the sender will keep filling up the queue quickly. So my question is: Will this cause the message queue to overflow?

消息使用者如下所示:

public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}

消费者收到的每条消息都包含一个caseID。对于每个caseID,它会将大量数据保存到数据库中,这需要很长时间。目前只为RabbitMQ设置了一个消费者,因为生产者/消费者使用相同的队列来发布/订阅caseID。那么如何才能加快消费者吞吐量,以便消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费率吗?或者我应该使用多个消费者同时使用传入的消息?或者是否存在任何异步方式让消费者异步使用消息而不等待它完成?欢迎任何建议。

Each message received by the consumer contains a caseID. For each caseID, it will save large amount of data to the database, which takes very long time. Currently only one consumer is set up for the RabbitMQ since producer/consumer use the same queue for the publish/subscribe of caseID. So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue? Should I use multithreading in the consumer part to speed up the consumption rate? Or should I use multiple consumers to consume the incoming message simutaneously? Or is there any asynchronous way to let the consumer consume the message asynchronously without waiting it to finish? Any suggestions are welcome.

推荐答案


这会导致邮件队列溢出吗?

"Will this cause the message queue to overflow?"

是的。随着队列长度的增加,RabbitMQ将进入流控制状态,以防止过多的内存消耗。它也会开始将消息保存到磁盘,而不是将它们保存在内存中。

Yes. RabbitMQ will enter a state of "flow control" to prevent excessive memory consumption as the queue length increases. It will also start persisting messages to disk, rather than hold them in memory.


那么我怎样才能加快消费者的吞吐量,以便消费者
可以赶上生产者并避免
队列中的消息溢出

"So how can I speed up the consumer throughput so that the consumer can catch up with the producer and avoid the message overflow in the queue"

你有2个选项:


  1. 添加更多消费者。请记住,如果选择此选项,您的数据库现在将被多个并发进程操纵。确保数据库能够承受额外的压力。

  2. 增加消费渠道的 QOS 值。这将从队列中提取更多消息并在消费者上缓冲它们。这将增加整体处理时间;如果5条消息被缓冲,则第5条消息将消息1 ... 5的处理时间完成。




我应该在消费者部分使用多线程来加快
消费率吗?

"Should I use multithreading in the consumer part to speed up the consumption rate?"

除非你有一个精心设计的解决方案。向应用程序添加并行性将在消费者方面增加大量开销。您最终可能会耗尽ThreadPool或限制内存使用。

Not unless you have a well-designed solution. Adding parallelism to an application is going to add a lot of overhead on the consumer-side. You may end up exhausting the ThreadPool or throttling memory-usage.

在处理AMQP时,您确实需要考虑每个流程的业务需求以便设计最佳解。您收到的消息对时间有多敏感?他们是否需要持久保存到数据库,或者对您的用户是否重要,无论该数据是否立即可用?

When dealing with AMQP, you really need to consider the business requirement for each process in order to design the optimal solution. How time-sensitive are your incoming messages? Do they need to be persisted to DB ASAP, or does it matter to your users whether or not that data is available immediately?

如果数据不需要保留您可以立即修改您的应用程序,以便消费者只需从队列中删除消息并将其保存到Redis中的缓存集合中。引入第二个进程,然后按顺序读取和处理缓存的消息。这将确保您的队列长度不会充分增长以导致流量控制,同时防止您的数据库被写入请求轰炸,这通常比读取请求更昂贵。您的消费者现在只是从队列中删除消息,稍后由另一个进程处理。

If the data does not need to be persisted immediately, you could modify your application so that the consumer(s) simply remove messages from the queue and save them to a cached collection, in Redis, for example. Introduce a second process which then reads and processes the cached messages sequentially. This will ensure that your queue-length does not grow sufficiently to result in flow-control, while preventing your DB from being bombarded with write requests, which are typically more expensive than read requests. Your consumer(s) now simply remove messages from the queue, to be dealt with by another process later.

这篇关于RabbitMQ:快速的生产者和缓慢的消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆