Spring Cloud Streams - 源和接收器的多个动态目的地 [英] Spring Cloud Streams - Multiple dynamic destinations for sources and sinks

查看:17
本文介绍了Spring Cloud Streams - 源和接收器的多个动态目的地的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的系统上有一个更改请求,它当前侦听多个频道并向多个频道发送消息,但现在目标名称将在数据库中并随时更改.我很难相信我是第一个遇到这种情况的人,但我看到的信息有限.

There was a change request on my system, which currently listens to multiple channels and send messages to multiple channels as well, but now the destination names will be in the database and change any time. I'm having trouble believing I'm the first one to come across this, but I see limited information out there.

我只找到了这两个...
动态接收器目的地:https://github.com/spring-cloud-stream-app-starters/router/tree/master/spring-cloud-starter-stream-sink-router,但是如何主动收听这些频道@StreamListener 是怎么做的?

All I found is these 2...
Dynamic sink destination: https://github.com/spring-cloud-stream-app-starters/router/tree/master/spring-cloud-starter-stream-sink-router, but how would that work to active listening to those channels the way it's done by @StreamListener?

动态源目的地:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/source-samples/dynamic-destination-source/,这样做

Dynamic source destinations: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/source-samples/dynamic-destination-source/, which does this

@Bean
    @ServiceActivator(inputChannel = "sourceChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }

但是payload.id"是什么?那里指定的目的地在哪里?

But what's that "payload.id"? And where are the destinations specified there??

推荐答案

请随时改进我的回答,希望能帮助到其他人.

Feel free to improve my answer, I hope it will help others.

现在是代码(它在我的调试器中工作).这是一个示例,尚未准备好用于生产!

Now the code (It worked in my debugger). This is an example, not production ready!

import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;


@Service
@EnableBinding
public class MessageSenderService {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Transactional
    public void sendMessage(final String topicName, final String payload) {
        final MessageChannel messageChannel = resolver.resolveDestination(topicName);
        messageChannel.send(new GenericMessage<String>(payload));
    }
}

以及 Spring Cloud Stream 的配置.

And configuration for Spring Cloud Stream.

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3

我在这里找到了https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/index.html#dynamicdestination它将在 Spring Cloud Stream 版本 2+ 中工作.我使用 2.1.2

I found here https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/index.html#dynamicdestination It will work in spring Cloud Stream version 2+. I use 2.1.2

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>

<小时>

这是如何消费来自动态目的地的消息

https://stackoverflow.com/a/56148190/4587961

配置

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Java 消费者.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());

        final String topic = message.getHeaders().get("kafka_receivedTopic");

        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

这篇关于Spring Cloud Streams - 源和接收器的多个动态目的地的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆