具有预取功能的Spring AMQP单一消费者并行处理 [英] Spring AMQP single consumer parallelism with prefetch
问题描述
我们有一个项目正在使用Spring-AMQP来消耗来自RabbitMQ代理的消息.我们希望增加使用方的并发性,以便多个工作线程可以并行处理消息.我首先阅读了本机RabbitMQ客户端的文档,这使我想到了使用单个使用者的设计,并且预取计数> 1以控制并行性.直接使用RabbitMQ客户端,这似乎很自然. DefaultConsumer
的handleDelivery
方法可以产生一个新的Runnable
来执行工作并在工作结束时确认该消息.预取计数有效地控制了使用者产生的最大Runnable
个数.
We have a project that's using Spring-AMQP to consume messages from our RabbitMQ broker. We would like to increase the concurrency on the consuming side so that multiple worker threads can process messages in parallel. I began by reading the documentation for the native RabbitMQ client, and that lead me to a design of using a single consumer, and a prefetch count > 1 to control for parallelism. Using the RabbitMQ client directly, this seems quite natural. The handleDelivery
method of the DefaultConsumer
can spawn a new Runnable
that does the work and acknowledges the message at the end of the work. The prefetch count effectively controls the maximum number of Runnable
s that the consumer spawns.
但是,这种设计似乎无法转化为Spring-AMQP领域.在SimpleMessageListenerContainer
中,对于每个AMQP使用者,所有消息都传递到单个BlockingQueueConsumer
中,并且单个线程将消息从BlockingQueueConsumer
的阻塞队列传递到MessageListener
.即使SimpleMessageListenerContainer
支持TaskExecutor
,TaskExecutor
也仅用于每个使用者运行一个任务.因此,要并行处理多个消息,必须使用多个使用者.
However, this design does not seem translatable to the Spring-AMQP world. In the SimpleMessageListenerContainer
, for each AMQP consumer, all messages get delivered into a single BlockingQueueConsumer
, and a single thread delivers messages from the BlockingQueueConsumer
's blocking queue to the MessageListener
. Even though the SimpleMessageListenerContainer
supports a TaskExecutor
, the TaskExecutor
is only used to run one task per consumer. So to process multiple messages in parallel one has to use multiple consumers.
这使我想到了一些关于Spring-AMQP并行性的问题.首先,我的单一用户和高预取数量的初始设计是通过AMQP实现并行性的有效方法吗?如果是这样,为什么Spring-AMQP避开该设计而转而采用每用户线程设计?是否可以自定义Spring-AMQP以使单用户并行性成为可能?
This leads me to a few questions about parallelism with Spring-AMQP. First, is my initial design with single consumer and high prefetch count a valid way to achieve parallelism with AMQP? If so, why does Spring-AMQP eschew this design in favor of a thread-per-consumer design? And is it possible to customize Spring-AMQP to make single-consumer parallelism possible?
推荐答案
Spring AMQP是针对较早版本的Rabbit客户端库设计的,该客户端客户端库的每个连接只有一个线程.
Spring AMQP was designed against an earlier version of the rabbit client library which only had one thread per connection.
DefaultConsumer的handleDelivery方法可以产生一个新的Runnable来执行工作并在工作结束时确认消息.
The handleDelivery method of the DefaultConsumer can spawn a new Runnable that does the work and acknowledges the message at the end of the work.
仅仅增加concurrentConsumers
并不会给您带来太多好处-唯一的区别是每个线程都有一个使用者,但是那里没有太多开销.
That doesn't really buy you a lot more than simply increasing the concurrentConsumers
- the only difference is there's a consumer for each thread, but there's not a whole lot of overhead there.
但是,您可以使用ChannelAwareMessageListener
做您想做的事,然后将ConfirmMode设置为MANUAL
-然后,您的侦听器负责确认消息.
You can do what you want, however, using a ChannelAwareMessageListener
and set the acknowledgeMode to MANUAL
- then your listener is responsible for acking the message(s).
在2.0(下一年)中,我们将有一个替代的侦听器容器,该容器将直接在客户端库线程上调用侦听器.但是,这是相当多的工作. 此(关闭)请求请求具有初始PoC ,但不是一个功能齐全的容器呢.
In 2.0 (next year) we will have an alternative listener container that will call the listener directly on the client library thread. But, it's a fair amount of work. This (closed) pull request has an initial PoC but it's not a fully featured container yet.
这篇关于具有预取功能的Spring AMQP单一消费者并行处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!