在rabbitmq配置spring boot中在AMQP中配置多个Vhost [英] configuring multiple Vhosts in AMQP in rabbitmq configuration spring boot

查看:309
本文介绍了在rabbitmq配置spring boot中在AMQP中配置多个Vhost的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在实施一个项目,我必须在rabbitmq中的不同虚拟主机之间发送消息.使用 SimpleRoutingConnectionFactory 但得到 java.lang.IllegalStateException:无法确定查找键 [null] 的目标 ConnectionFactory.任何有想法如何在下面实现的人都是我的配置类代码.

i'm implementing a project where i have to send messages across different vhosts in rabbitmq. using SimpleRoutingConnectionFactory but get java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]. Anyone who has an idea how to implement such below is my configuration class code.

@Configuration
@EnableRabbit
public class RabbitMQConfiguration {

@Autowired
ConnectionProperties connect;

// client1 exchanges
@Bean
public TopicExchange client1Exchange() {
    TopicExchange ex = new TopicExchange("ex_client1");
    ex.setAdminsThatShouldDeclare(client1());
    return ex;
}

// client2 exchange
@Bean
public TopicExchange client2Exchange() {
    TopicExchange ex = new TopicExchange("ex_client2");
    ex.setAdminsThatShouldDeclare(client2Admin());
    return ex;
}

@Bean
public Queue client1Queue() {
    Queue queue = new Queue("client1_queue");
    queue.setAdminsThatShouldDeclare(client1());
    return queue;
}

@Bean
public Binding client1Binding() {
    Binding binding = BindingBuilder.bind(client1Queue())
            .to(client1Exchange())
            .with("client1_key");
    binding.setAdminsThatShouldDeclare(client1());
    return binding;
}


@Bean
public Queue client2Queue() {
    Queue queue = new Queue("client2_queue");
    queue.setAdminsThatShouldDeclare(client2());
    return queue;
}

@Bean
public Binding client2Binding() {
    Binding binding = BindingBuilder.bind(client2Queue())
            .to(client2Exchange())
            .with("client2_key");
    binding.setAdminsThatShouldDeclare(client2());
    return binding;
}

@Bean
@Primary
public ConnectionFactory connectionFactory() {
    SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
    Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
    targetConnectionFactories.put("client1", client1ConnectionFactory());
    targetConnectionFactories.put("client2", client2ConnectionFactory());
    connectionFactory.setTargetConnectionFactories(targetConnectionFactories);
    return connectionFactory;
}

@Bean
public ConnectionFactory client1ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient1VHost());
    connectionFactory.setUsername(connect.getRabbitMQClient1User());
    connectionFactory.setPassword(connect.getRabbitMQClient1Pass());
    return connectionFactory;
}

@Bean
public ConnectionFactory client2ConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
    connectionFactory.setVirtualHost(connect.getRabbitMQClient2VHost());
    connectionFactory.setUsername(connect.getRabbitClient2User());
    connectionFactory.setPassword(connect.getRabbitClient2Pass());
    return connectionFactory;
}

// You can comment all methods below and remove interface's implementation to use the default serialization / deserialization
@Bean
public RabbitTemplate rabbitTemplate() {
    final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
    return rabbitTemplate;
}

@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
    return new MappingJackson2MessageConverter();
}

@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(consumerJackson2MessageConverter());
    return factory;
}

@Bean
public TaskExecutor rabbitListenerExecutor() {
    int threads = Integer.valueOf(connect.getMinConsumers()) * 2; // threads = min consumers* no of queues
    final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(threads);
    executor.setMaxPoolSize(threads);
    executor.setThreadNamePrefix("RabbitThreadListener");
    executor.afterPropertiesSet();
    return executor;
}

@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrentConsumers(Integer.valueOf(connect.getMinConsumers()));
    factory.setPrefetchCount(Integer.valueOf(connect.getPrefetchCount()));
    factory.setTaskExecutor(rabbitListenerExecutor());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

@Bean
public RabbitAdmin client1() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client1ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

@Bean
public RabbitAdmin client2() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(client2ConnectionFactory());
    rabbitAdmin.afterPropertiesSet();
    return rabbitAdmin;
}

}

我得到了这个堆栈跟踪

o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, 
processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119)
    at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140)
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1335)
    at java.lang.Thread.run(Thread.java:748)

推荐答案

RoutingConnectionFactory 通常用于发布消息.

在侦听器容器中使用路由工厂时,您必须配置查找键以匹配容器中配置的队列名称.

When using a routing factory in a listener container you must configure the lookup key to match the queue name(s) configured in the container.

来自文档:

同样从 1.4 版开始,您可以在侦听器容器中配置路由连接工厂.在这种情况下,队列名称列表用作查找键.例如,如果您使用 setQueueNames("foo", "bar") 配置容器,则查找键将为 "[foo,bar]" (无空格).

Also starting with version 1.4, you can configure a routing connection factory in a listener container. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with setQueueNames("foo", "bar"), the lookup key will be "[foo,bar]" (no spaces).

所以;如果 RabbitListener 侦听队列 foo,则路由查找键必须为 [foo].(您可以使用不同的键多次添加相同的 CF).

So; if a RabbitListener listens to queue foo the routing lookup key must be [foo]. (You can add the same CF multiple times with different keys).

或者您可以简单地创建多个容器工厂,每个工厂获得一个具体的 CF 而不是路由 CF.

Or you can simply create multiple container factories, with each getting a concrete CF instead of the routing CF.

编辑

假设你有

@RabbitListener(queues = "myQueue", connectionFactory = "myRabbitListenerContainerFactory")
public void listen(...) {
    ...
}

如果 myQueueclient1 的 vhost 中,那么您需要在路由器 CF 映射中添加一个条目...

If myQueue is in client1's vhost, then you need an entry in the router CF map thus...

targetConnectionFactories.put("[myQueue]", client1ConnectionFactory());

...因为为侦听器生成的侦听器容器将在其查找键中使用队列名称.

...because the listener container generated for the listener will use the queue name in its lookup key.

或者,创建2个容器工厂;每个都直接与 client1 和 client2 CF 连接,而不是路由 CF...

Alternatively, create 2 container factories; each wired directly with client1 and client2 CFs instead of the routing CF...

@Bean
public SimpleRabbitListenerContainerFactory client1ListenerContainerFactory() {

@Bean
public SimpleRabbitListenerContainerFactory client2ListenerContainerFactory() {

@RabbitListener(queues = "myQueue", connectionFactory = "client1ListenerContainerFactory")
public void listen(...) {
    ...
}

即根本不要为侦听器使用路由 CF - 容器只有一个连接.

i.e. don't use the routing CF at all for listeners - containers only have one connection.

这篇关于在rabbitmq配置spring boot中在AMQP中配置多个Vhost的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆