Kafka Streams:使用Spring Cloud Stream为每组主题定义多个Kafka Streams [英] Kafka Streams: Define multiple Kafka Streams using Spring Cloud Stream for each set of topics

本文介绍了Kafka Streams:使用Spring Cloud Stream为每组主题定义多个Kafka Streams的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka Streams做一个简单的POC.但是,启动应用程序时出现异常.我正在使用Spring Boot 2.3.5的Spring-Kafka,Kafka-Streams 2.5.1Kafka流配置

I am trying to do a simple POC with Kafka Streams. However I am getting exception while starting the application. I am using Spring-Kafka, Kafka-Streams 2.5.1 with Spring boot 2.3.5 Kafka stream configuration

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processCCC() {
        return input -> input.peek((key, value) -> log
                .info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    /*
    @Bean
    public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

    @Bean
    public Topology kafkaStreamTopology() {
        final StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
        return streamsBuilder.build();
    } */
}

application.yaml 配置如下.我的想法是我有3个输入主题和3个输出主题.该组件从输入主题中获取输入,然后将输出提供给outputtopic.

application.yaml configured is like below. The idea is that I have 3 input and 3 output topics. The component takes input from input topic and gives output to outputtopic.

spring:
  application.name: consumerapp-1
  cloud:
    function:
      definition: processAAA;processBBB;processCCC
    stream:
      kafka.binder: 
          brokers: 127.0.0.1:9092
          autoCreateTopics: true
          auto-add-partitions: true
      kafka.streams.binder:
          configuration: 
            commit.interval.ms: 1000
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        processAAA-in-0:
          destination: aaaInputTopic
        processAAA-out-0:
          destination: aaaOutputTopic
        processBBB-in-0:
          destination: bbbInputTopic
        processBBB-out-0:
          destination: bbbOutputTopic
        processCCC-in-0:
          destination: cccInputTopic
        processCCC-out-0:
          destination: cccOutputTopic

引发异常

Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)

有人可以帮助我使用Kafka Streams Spring-Kafka代码示例来处理具有多个输入和输出主题的情况.

Can anyone help me with Kafka Streams Spring-Kafka code samples for processing with multiple input and output topics.

更新:2021年1月21日

在删除所有kafkaStreams和kafkaStreamsTopology bean配置之后,iam无限循环出现以下消息.消息使用仍然无法正常工作.我已经使用@Bean函数定义在application.yaml中检查了订阅.它们对我来说都不错,但仍然出现此交叉接线错误.我已用上方的application.yaml替换了application.properties

After removing all kafkaStreams and kafkaStreamsTopology beans configuration iam getting below message in an infinite loop. The messages consumption is still not working. I have checked the subscription in application.yaml with the @Bean Function definitions. they all look ok to me but still I get this cross wiring error. I have replaced the application.properties with application.yaml above

    [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription

推荐答案

我设法解决了这个问题.我写这篇文章是为了他人的利益.如果要在单个应用程序jar中包含多个流,那么关键在于定义多个应用程序ID ,每个ID均为每个.我一直都知道,但是我不知道如何定义它.最后,答案是我在阅读SCSt文档后设法挖掘出来的东西.以下是如何定义application.yaml. application.yaml 如下所示

I have managed to solve the problem. I am writing this for the benefit of others. If you want to include multiple streams in your single app jar then the key is in defining multiple application Ids that is one per each of your streams. I knew this all along but I was not aware on how to define it. Finally the answer is something I have managed to dig out after reading the SCSt documentation. Below is how the application.yaml can be defined. application.yaml is like below

spring:
  application.name: kafkaMultiStreamConsumer
  cloud:
    function:
      definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
    stream:
      kafka: 
        binder:
          brokers: 127.0.0.1:9092
          min-partition-count: 3
          replication-factor: 2
          transaction:
            transaction-id-prefix: transaction-id-2000
          autoCreateTopics: true
          auto-add-partitions: true
        streams:
          binder:
            functions: 
            // needed for functional
              processBBB: 
                application-id: SampleBBBapplication
              processAAA: 
                application-id: SampleAAAapplication
              processCCC: 
                application-id: SampleCCCapplication
            configuration: 
              commit.interval.ms: 1000            
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde        
      bindings:
      // Below is for Imperative Style programming using 
      // the annotation namely @StreamListener, @SendTo in .java class
        inputAAA:
          destination: aaaInputTopic
        outputAAA:
          destination: aaaOutputTopic
        inputBBB:
          destination: bbbInputTopic
        outputBBB:
          destination: bbbOutputTopic
        inputCCC:
          destination: cccInputTopic
        outputCCC:
          destination: cccOutputTopic
     // Functional Style programming using Function<KStream...> use either one of them
     // as both are not required. If you use both its ok but only one of them works
     // from what i have seen @StreamListener is triggered always.
     // Below is from functional style
        processAAA-in-0:
          destination: aaaInputTopic
          group: processAAA-group
        processAAA-out-0:
          destination: aaaOutputTopic
          group: processAAA-group
        processBBB-in-0:
          destination: bbbInputTopic
          group: processBBB-group
        processBBB-out-0:
          destination: bbbOutputTopic
          group: processBBB-group
        processCCC-in-0:
          destination: cccInputTopic
          group: processCCC-group
        processCCC-out-0:
          destination: cccOutputTopic
          group: processCCC-group

一旦定义了上面的内容,我们现在需要定义实现流处理逻辑的单个Java类.您的Java类可以如下所示.根据您的要求,为其他2个或N个流类似地创建.一个示例如下所示: AAASampleStreamTask.java

Once above is defined we now need to define individual java classes where the Stream processing logic is implemented. Your Java class can be something like below. Create similarly for other 2 or N streams as per your requirement. One example is like below : AAASampleStreamTask.java

@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
    private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);

    @StreamListener(AAASampleChannel.INPUT)
    @SendTo(AAASampleChannel.OUTPUT)
    public KStream<String, String> processAAA(KStream<String, String> input) {
        input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
       ...
       // do other business logic
       ...
        return input;
    }
    
    /**
     * Use above or below. Below style is latest startting from ScSt 3.0 if iam not 
     * wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have 
     * both then above gets priority as per my observation
     */     
    /* 
    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log.info(
                "Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
       ...
     // do other business logic
       ...
    }
    */
}

如果您要使用命令式样式编程(而不是功能性的),则必须使用Channel. AAASampleChannel.java

The Channel is required if you want to go with Imperative style programming not for functional. AAASampleChannel.java

public interface AAASampleChannel {
    String INPUT = "inputAAA";
    String OUTPUT = "outputAAA";

    @Input(INPUT)
    KStream<String, String> inputAAA();

    @Output(OUTPUT)
    KStream<String, String> outputAAA();
}

这篇关于Kafka Streams:使用Spring Cloud Stream为每组主题定义多个Kafka Streams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆