具有预取功能的Spring AMQP单一消费者并行处理 [英] Spring AMQP single consumer parallelism with prefetch

查看:140
本文介绍了具有预取功能的Spring AMQP单一消费者并行处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个项目正在使用Spring-AMQP来消耗来自RabbitMQ代理的消息.我们希望增加使用方的并发性,以便多个工作线程可以并行处理消息.我首先阅读了本机RabbitMQ客户端的文档,这使我想到了使用单个使用者的设计,并且预取计数> 1以控制并行性.直接使用RabbitMQ客户端,这似乎很自然. DefaultConsumerhandleDelivery方法可以产生一个新的Runnable来执行工作并在工作结束时确认该消息.预取计数有效地控制了使用者产生的最大Runnable个数.

We have a project that's using Spring-AMQP to consume messages from our RabbitMQ broker. We would like to increase the concurrency on the consuming side so that multiple worker threads can process messages in parallel. I began by reading the documentation for the native RabbitMQ client, and that lead me to a design of using a single consumer, and a prefetch count > 1 to control for parallelism. Using the RabbitMQ client directly, this seems quite natural. The handleDelivery method of the DefaultConsumer can spawn a new Runnable that does the work and acknowledges the message at the end of the work. The prefetch count effectively controls the maximum number of Runnables that the consumer spawns.

但是,这种设计似乎无法转化为Spring-AMQP领域.在SimpleMessageListenerContainer中,对于每个AMQP使用者,所有消息都传递到单个BlockingQueueConsumer中,并且单个线程将消息从BlockingQueueConsumer的阻塞队列传递到MessageListener.即使SimpleMessageListenerContainer支持TaskExecutorTaskExecutor也仅用于每个使用者运行一个任务.因此,要并行处理多个消息,必须使用多个使用者.

However, this design does not seem translatable to the Spring-AMQP world. In the SimpleMessageListenerContainer, for each AMQP consumer, all messages get delivered into a single BlockingQueueConsumer, and a single thread delivers messages from the BlockingQueueConsumer's blocking queue to the MessageListener. Even though the SimpleMessageListenerContainer supports a TaskExecutor, the TaskExecutor is only used to run one task per consumer. So to process multiple messages in parallel one has to use multiple consumers.

这使我想到了一些关于Spring-AMQP并行性的问题.首先,我的单一用户和高预取数量的初始设计是通过AMQP实现并行性的有效方法吗?如果是这样,为什么Spring-AMQP避开该设计而转而采用每用户线程设计?是否可以自定义Spring-AMQP以使单用户并行性成为可能?

This leads me to a few questions about parallelism with Spring-AMQP. First, is my initial design with single consumer and high prefetch count a valid way to achieve parallelism with AMQP? If so, why does Spring-AMQP eschew this design in favor of a thread-per-consumer design? And is it possible to customize Spring-AMQP to make single-consumer parallelism possible?

推荐答案

Spring AMQP是针对较早版本的Rabbit客户端库设计的,该客户端客户端库的每个连接只有一个线程.

Spring AMQP was designed against an earlier version of the rabbit client library which only had one thread per connection.

DefaultConsumer的handleDelivery方法可以产生一个新的Runnable来执行工作并在工作结束时确认消息.

The handleDelivery method of the DefaultConsumer can spawn a new Runnable that does the work and acknowledges the message at the end of the work.

仅仅增加concurrentConsumers并不会给您带来太多好处-唯一的区别是每个线程都有一个使用者,但是那里没有太多开销.

That doesn't really buy you a lot more than simply increasing the concurrentConsumers - the only difference is there's a consumer for each thread, but there's not a whole lot of overhead there.

但是,您可以使用ChannelAwareMessageListener做您想做的事,然后将ConfirmMode设置为MANUAL-然后,您的侦听器负责确认消息.

You can do what you want, however, using a ChannelAwareMessageListener and set the acknowledgeMode to MANUAL - then your listener is responsible for acking the message(s).

在2.0(下一年)中,我们将有一个替代的侦听器容器,该容器将直接在客户端库线程上调用侦听器.但是,这是相当多的工作. 此(关闭)请求请求具有初始PoC ,但不是一个功能齐全的容器呢.

In 2.0 (next year) we will have an alternative listener container that will call the listener directly on the client library thread. But, it's a fair amount of work. This (closed) pull request has an initial PoC but it's not a fully featured container yet.

这篇关于具有预取功能的Spring AMQP单一消费者并行处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆