如何在Spring启动中实现循环队列使用者 [英] How to implement a round-robin queue consumer in Spring boot

查看:117
本文介绍了如何在Spring启动中实现循环队列使用者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Spring中构建一个消息驱动服务,它将在集群中运行,需要以循环方式从RabbitMQ队列中提取消息。该实现目前正在先将消息从队列中拉出,导致某些服务器得到备份而其他服务器处于空闲状态。

I am building a message driven service in spring which will run in a cluster and needs to pull messages from a RabbitMQ queue in a round robin manner. The implementation is currently pulling messages off the queue in a first come basis leading to some servers getting backed up while others are idle.

当前的QueueConsumerConfiguration.java如下所示: / p>

The current QueueConsumerConfiguration.java looks like :

@Configuration
public class QueueConsumerConfiguration extends RabbitMqConfiguration {
private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class);

private static final int DEFAULT_CONSUMERS=2;

@Value("${eventservice.inbound}")
protected String inboudEventQueue;

@Value("${eventservice.consumers}")
protected int queueConsumers;

@Autowired
private EventHandler eventtHandler;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.inboudEventQueue);
    template.setQueue(this.inboudEventQueue);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}

@Bean
public Queue inboudEventQueue() {
    return new Queue(this.inboudEventQueue);
}

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(this.inboudEventQueue);
    container.setMessageListener(messageListenerAdapter());
    if (this.queueConsumers > 0) {
        LOG.info("Starting queue consumers:" + this.queueConsumers );
        container.setMaxConcurrentConsumers(this.queueConsumers);
        container.setConcurrentConsumers(this.queueConsumers);
    } else {
        LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS);
        container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS);
        container.setConcurrentConsumers(DEFAULT_CONSUMERS);            
    }
    return container;
}

@Bean
public MessageListenerAdapter messageListenerAdapter() {
    return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter());
}
}

是否只是添加

container.setChannelTransacted(true);

到配置?

推荐答案

RabbitMQ对待所有消费者都是一样的 - 它知道一个容器中的多个消费者之间没有区别。多个容器中的一个消费者(例如,在不同的主机上)。从Rabbit的角度来看,每个人都是消费者。

RabbitMQ treats all consumers the same - it knows no difference between multiple consumers in one container Vs. one consumer in multiple containers (e.g. on different hosts). Each is a consumer from Rabbit's perspective.

如果你想要更多地控制服务器亲和力,你需要使用多个队列,每个容器都在监听它自己的队列。

If you want more control over server affinity, you need to use multiple queues with each container listening to its own queue.

然后,您可以控制生产者方面的分配 - 例如使用主题或直接交换和特定路由键将消息路由到特定队列。

You then control the distribution on the producer side - e.g. using a topic or direct exchange and specific routing keys to route messages to a specific queue.

这将生产者与消费者紧密联系在一起(他必须知道有多少人)。

This tightly binds the producer to the consumers (he has to know how many there are).

或者你可以让你的制作人使用路由键 rk.0,rk.1,...,rk.29 (反复,当达到30时重置为0)。

Or you could have your producer use routing keys rk.0, rk.1, ..., rk.29 (repeatedly, resetting to 0 when 30 is reached).

然后你可以用多个绑定绑定消费者队列 -

Then you can bind the consumer queues with multiple bindings -

消费者1获得rk.0至rk.9,2获得rk.10至rk.19等等。

consumer 1 gets rk.0 to rk.9, 2 gets rk.10 to rk.19, etc, etc.

如果你那么决定增加消费者的数量,只需适当地重构绑定以重新分配工作。

If you then decide to increase the number of consumers, just refactor the bindings appropriately to redistribute the work.

容器将按需扩展到maxConcurrentConsumers但实际上只会缩小比例当整个容器闲置一段时间。

The container will scale up to maxConcurrentConsumers on demand but, practically, scaling down only occurs when the entire container is idle for some time.

这篇关于如何在Spring启动中实现循环队列使用者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆