在rabbitmq配置spring boot中在AMQP中配置多个Vhost [英] configuring multiple Vhosts in AMQP in rabbitmq configuration spring boot
问题描述
我正在实施一个项目,我必须在rabbitmq中的不同虚拟主机之间发送消息.使用 SimpleRoutingConnectionFactory 但得到 java.lang.IllegalStateException:无法确定查找键 [null] 的目标 ConnectionFactory.任何有想法如何在下面实现的人都是我的配置类代码.
i'm implementing a project where i have to send messages across different vhosts in rabbitmq. using SimpleRoutingConnectionFactory but get java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]. Anyone who has an idea how to implement such below is my configuration class code.
@Configuration
@EnableRabbit
public class RabbitMQConfiguration {
@Autowired
ConnectionProperties connect;
// client1 exchanges
@Bean
public TopicExchange client1Exchange() {
TopicExchange ex = new TopicExchange("ex_client1");
ex.setAdminsThatShouldDeclare(client1());
return ex;
}
// client2 exchange
@Bean
public TopicExchange client2Exchange() {
TopicExchange ex = new TopicExchange("ex_client2");
ex.setAdminsThatShouldDeclare(client2Admin());
return ex;
}
@Bean
public Queue client1Queue() {
Queue queue = new Queue("client1_queue");
queue.setAdminsThatShouldDeclare(client1());
return queue;
}
@Bean
public Binding client1Binding() {
Binding binding = BindingBuilder.bind(client1Queue())
.to(client1Exchange())
.with("client1_key");
binding.setAdminsThatShouldDeclare(client1());
return binding;
}
@Bean
public Queue client2Queue() {
Queue queue = new Queue("client2_queue");
queue.setAdminsThatShouldDeclare(client2());
return queue;
}
@Bean
public Binding client2Binding() {
Binding binding = BindingBuilder.bind(client2Queue())
.to(client2Exchange())
.with("client2_key");
binding.setAdminsThatShouldDeclare(client2());
return binding;
}
@Bean
@Primary
public ConnectionFactory connectionFactory() {
SimpleRoutingConnectionFactory connectionFactory = new SimpleRoutingConnectionFactory();
Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
targetConnectionFactories.put("client1", client1ConnectionFactory());
targetConnectionFactories.put("client2", client2ConnectionFactory());
connectionFactory.setTargetConnectionFactories(targetConnectionFactories);
return connectionFactory;
}
@Bean
public ConnectionFactory client1ConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
connectionFactory.setVirtualHost(connect.getRabbitMQClient1VHost());
connectionFactory.setUsername(connect.getRabbitMQClient1User());
connectionFactory.setPassword(connect.getRabbitMQClient1Pass());
return connectionFactory;
}
@Bean
public ConnectionFactory client2ConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(connect.getRabbitMQHost());
connectionFactory.setVirtualHost(connect.getRabbitMQClient2VHost());
connectionFactory.setUsername(connect.getRabbitClient2User());
connectionFactory.setPassword(connect.getRabbitClient2Pass());
return connectionFactory;
}
// You can comment all methods below and remove interface's implementation to use the default serialization / deserialization
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(consumerJackson2MessageConverter());
return factory;
}
@Bean
public TaskExecutor rabbitListenerExecutor() {
int threads = Integer.valueOf(connect.getMinConsumers()) * 2; // threads = min consumers* no of queues
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(threads);
executor.setMaxPoolSize(threads);
executor.setThreadNamePrefix("RabbitThreadListener");
executor.afterPropertiesSet();
return executor;
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(Integer.valueOf(connect.getMinConsumers()));
factory.setPrefetchCount(Integer.valueOf(connect.getPrefetchCount()));
factory.setTaskExecutor(rabbitListenerExecutor());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Bean
public RabbitAdmin client1() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(client1ConnectionFactory());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
public RabbitAdmin client2() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(client2ConnectionFactory());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
}
我得到了这个堆栈跟踪
o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception,
processing can restart if the connection factory supports it
java.lang.IllegalStateException: Cannot determine target ConnectionFactory for lookup key [null]
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.determineTargetConnectionFactory(AbstractRoutingConnectionFactory.java:119)
at org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory.createConnection(AbstractRoutingConnectionFactory.java:97)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createConnection(ConnectionFactoryUtils.java:90)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:140)
at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:76)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:505)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1335)
at java.lang.Thread.run(Thread.java:748)
推荐答案
RoutingConnectionFactory
通常用于发布消息.
在侦听器容器中使用路由工厂时,您必须配置查找键以匹配容器中配置的队列名称.
When using a routing factory in a listener container you must configure the lookup key to match the queue name(s) configured in the container.
来自文档:
同样从 1.4 版开始,您可以在侦听器容器中配置路由连接工厂.在这种情况下,队列名称列表用作查找键.例如,如果您使用 setQueueNames("foo", "bar")
配置容器,则查找键将为 "[foo,bar]"
(无空格).
Also starting with version 1.4, you can configure a routing connection factory in a listener container. In that case, the list of queue names is used as the lookup key. For example, if you configure the container with
setQueueNames("foo", "bar")
, the lookup key will be"[foo,bar]"
(no spaces).
所以;如果 RabbitListener
侦听队列 foo
,则路由查找键必须为 [foo]
.(您可以使用不同的键多次添加相同的 CF).
So; if a RabbitListener
listens to queue foo
the routing lookup key must be [foo]
. (You can add the same CF multiple times with different keys).
或者您可以简单地创建多个容器工厂,每个工厂获得一个具体的 CF 而不是路由 CF.
Or you can simply create multiple container factories, with each getting a concrete CF instead of the routing CF.
编辑
假设你有
@RabbitListener(queues = "myQueue", connectionFactory = "myRabbitListenerContainerFactory")
public void listen(...) {
...
}
如果 myQueue
在 client1
的 vhost 中,那么您需要在路由器 CF 映射中添加一个条目...
If myQueue
is in client1
's vhost, then you need an entry in the router CF map thus...
targetConnectionFactories.put("[myQueue]", client1ConnectionFactory());
...因为为侦听器生成的侦听器容器将在其查找键中使用队列名称.
...because the listener container generated for the listener will use the queue name in its lookup key.
或者,创建2个容器工厂;每个都直接与 client1 和 client2 CF 连接,而不是路由 CF...
Alternatively, create 2 container factories; each wired directly with client1 and client2 CFs instead of the routing CF...
@Bean
public SimpleRabbitListenerContainerFactory client1ListenerContainerFactory() {
@Bean
public SimpleRabbitListenerContainerFactory client2ListenerContainerFactory() {
和
@RabbitListener(queues = "myQueue", connectionFactory = "client1ListenerContainerFactory")
public void listen(...) {
...
}
即根本不要为侦听器使用路由 CF - 容器只有一个连接.
i.e. don't use the routing CF at all for listeners - containers only have one connection.
这篇关于在rabbitmq配置spring boot中在AMQP中配置多个Vhost的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!