不同消费者的每条消息的 Spring Cloud Stream 主题 [英] Spring Cloud Stream topic per message for different consumers
问题描述
我要找的拓扑是
到目前为止,我还没有看到在 Cloud Stream 中定义每条消息的主题的方法.我知道消费者将被绑定到特定的主题,但是在将消息发送到交换之前,生产者如何设置每条消息的主题?
So far I have not seen a way to define the topic per message in Cloud Stream. I understand that the consumers will be bound to specific topic but how does the producer sets the topic per message before sending the message to the exchange?
source.output().send(MessageBuilder.withPayload(myMessage).build());
不提供任何方法来设置交换主题以路由到适当的消费者.
Does not provide any way to set the topic for the exchange to route to the proper consumer.
或者我可能没有正确理解某些东西?
Or maybe I don't understand something correctly?
更新
由于 bindingRoutingKey
是 2222
,我希望不会在消费者中收到消息,并且我使用 routeTo
发送1111
.但我仍然在消费者身上收到它.
I would expect not to receive the message in the consumer due to the bindingRoutingKey
being 2222
and I am sending with routeTo
1111
. But I still receive it on the consumer.
生产者属性:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']
@EnableBinding(Source.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
发件人:
source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());
和消费者:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222
申请:
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) {
log.info("Message received moDTO: {}", moDTO);
}
}
第二次更新
以下已接受答案中的建议.我能够让它工作.需要使用其 UI 从 RabbitMQ 中删除交换和队列并重新启动 RabbitMQ docker 映像.
With the suggestions in the accepted answer below. I was able to make it work. Needed to remove the exchanges and queues from RabbitMQ using its UI and restart the RabbitMQ docker image.
推荐答案
routingKeyExpression
rabbitmq 生产者属性.
例如...producer.routing-key-expression=headers['routeTo']
然后
source.output().send(MessageBuilder.withPayload(myMessage)
.setHeader("routeTo", "Booking.new")
.build());
注意目的地是交易所名称.默认情况下,活页夹需要 Topic
交换.如果您希望改用直接交换,则必须设置 exchangeType
属性.
Note that the destination is the exchange name. By default, the binder expects a Topic
exchange. If you wish to use a Direct exchange instead, you must set the exchangeType
property.
这篇关于不同消费者的每条消息的 Spring Cloud Stream 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!