RabbitMQ:在运行时向侦听器动态添加队列 [英] RabbitMQ: Dynamic addition of queues to a listener at runtime

查看:595
本文介绍了RabbitMQ:在运行时向侦听器动态添加队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,我需要使用在运行时发现的队列中的消息.

I have a use case where I need to consume messages from queues which are discovered at runtime.

这里我有一个配置类和侦听器类.我已经为两个现有队列定义了一个使用者,并希望使用可能在运行时发现的新队列中的消息,并遵循相同的命名约定,即 queue.animals.*

Here I have a config class and the listener class. I have defined a consumer for the two existing queues and want to consume messages from new queues which may be discovered at runtime and follow the same naming convention i.e. queue.animals.*

此外,我还有另一个服务,它将向我发送名为newQueues"的队列中新发现的队列名称.如果不需要,可以更改此方法,我们可以摆脱在newQueues"上发送消息的服务.

Also, I have another service which will send me the newly discovered queue name on a queue named "newQueues". This approach can be changed if not needed and we can get rid of the service sending messages on "newQueues".

@EnableRabbit
public class RabbitConfiguration implements RabbitListenerConfigurer {

    public static final String queue1= "queue.animals.cat";
    public static final String queue2= "queue.animals.dog";

    @Autowired
    private ConnectionFactory connectionFactory;

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    @Bean
    public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setEndpointRegistry(listenerEndpointRegistry());
    }

@Autowired
private RabbitListenerEndpointRegistry listenerEndpointRegistry;

@RabbitListener(id = "qEvent", queues = {"newQueues"})
public void processQueueEvents(String newQueueName) {
    ((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
        .addQueueNames(newQueueName);

    System.out.println("Received a message with the new queue name: " + newQueueName);
    
}

@RabbitListener(id = "animalQContainer" , queues = { queue1, queue2 })
public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
    System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
    //process animalObj
}

我目前收到以下异常:

Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

我对 RabbitMQ 很陌生,所以不确定我是否正确地掌握了所有内容.感谢您的帮助.

I am very new to RabbitMQ, so not sure if I have all the pieces correctly. Thank you for your help.

推荐答案

由于这里配置了 Jackson2JsonMessageConverter,processQueueEvents 方法无法解析字符串.创建了一个新类并将对象传递给 processQueueEvent 方法以解决问题中提到的异常:

Since the Jackson2JsonMessageConverter is configured here, the processQueueEvents method can not parse a string. Created a new class and passed in an object to the processQueueEvent method to get past the exception mentioned in the question:

    public void processQueueEvents(NewQueue newQueueName) {
        System.out.println("Received a message on a new queue: " + newQueueName);
        String name = newQueueName.toString();

这篇关于RabbitMQ:在运行时向侦听器动态添加队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆