不同消费者的每条消息的 Spring Cloud Stream 主题 [英] Spring Cloud Stream topic per message for different consumers

查看:21
本文介绍了不同消费者的每条消息的 Spring Cloud Stream 主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要找的拓扑是

到目前为止,我还没有看到在 Cloud Stream 中定义每条消息的主题的方法.我知道消费者将被绑定到特定的主题,但是在将消息发送到交换之前,生产者如何设置每条消息的主题?

So far I have not seen a way to define the topic per message in Cloud Stream. I understand that the consumers will be bound to specific topic but how does the producer sets the topic per message before sending the message to the exchange?

source.output().send(MessageBuilder.withPayload(myMessage).build());

不提供任何方法来设置交换主题以路由到适当的消费者.

Does not provide any way to set the topic for the exchange to route to the proper consumer.

或者我可能没有正确理解某些东西?

Or maybe I don't understand something correctly?

更新

由于 bindingRoutingKey2222,我希望不会在消费者中收到消息,并且我使用 routeTo 发送1111.但我仍然在消费者身上收到它.

I would expect not to receive the message in the consumer due to the bindingRoutingKey being 2222 and I am sending with routeTo 1111. But I still receive it on the consumer.

生产者属性:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.output.destination=messageExchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression=headers['routeTo']


@EnableBinding(Source.class)
@SpringBootApplication
public class Application {

   public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

}

发件人:

source.output().send(MessageBuilder.withPayload(mo).setHeader("routeTo", "1111").build());

和消费者:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

spring.cloud.stream.bindings.input.destination=messageExchange
spring.cloud.stream.rabbit.bindings.input.consumer.bindingRoutingKey=2222

申请:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
}

@StreamListener(Sink.INPUT)
public void ReceiveMo(String moDTO) {
    log.info("Message received moDTO: {}", moDTO);
}

}

第二次更新

以下已接受答案中的建议.我能够让它工作.需要使用其 UI 从 RabbitMQ 中删除交换和队列并重新启动 RabbitMQ docker 映像.

With the suggestions in the accepted answer below. I was able to make it work. Needed to remove the exchanges and queues from RabbitMQ using its UI and restart the RabbitMQ docker image.

推荐答案

routingKeyExpression rabbitmq 生产者属性.

例如...producer.routing-key-expression=headers['routeTo']

然后

source.output().send(MessageBuilder.withPayload(myMessage)
    .setHeader("routeTo", "Booking.new")
    .build());

注意目的地是交易所名称.默认情况下,活页夹需要 Topic 交换.如果您希望改用直接交换,则必须设置 exchangeType 属性.

Note that the destination is the exchange name. By default, the binder expects a Topic exchange. If you wish to use a Direct exchange instead, you must set the exchangeType property.

这篇关于不同消费者的每条消息的 Spring Cloud Stream 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆