Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams [英] Kafka Streams: Define multiple Kafka Streams using Spring Cloud Stream for each set of topics
问题描述
我正在尝试使用 Kafka Streams 做一个简单的 POC.但是,我在启动应用程序时遇到异常.我正在使用 Spring-Kafka、Kafka-Streams 2.5.1 和 Spring Boot 2.3.5Kafka流配置
I am trying to do a simple POC with Kafka Streams. However I am getting exception while starting the application. I am using Spring-Kafka, Kafka-Streams 2.5.1 with Spring boot 2.3.5 Kafka stream configuration
@Configuration
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log
.info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processBBB() {
return input -> input.peek((key, value) -> log
.info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processCCC() {
return input -> input.peek((key, value) -> log
.info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
/*
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
kafkaStreams.start();
return kafkaStreams;
}
@Bean
public Topology kafkaStreamTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
return streamsBuilder.build();
} */
}
application.yaml 配置如下.这个想法是我有 3 个输入和 3 个输出主题.该组件从输入主题获取输入并将输出提供给输出主题.
application.yaml configured is like below. The idea is that I have 3 input and 3 output topics. The component takes input from input topic and gives output to outputtopic.
spring:
application.name: consumerapp-1
cloud:
function:
definition: processAAA;processBBB;processCCC
stream:
kafka.binder:
brokers: 127.0.0.1:9092
autoCreateTopics: true
auto-add-partitions: true
kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
processAAA-in-0:
destination: aaaInputTopic
processAAA-out-0:
destination: aaaOutputTopic
processBBB-in-0:
destination: bbbInputTopic
processBBB-out-0:
destination: bbbOutputTopic
processCCC-in-0:
destination: cccInputTopic
processCCC-out-0:
destination: cccOutputTopic
抛出的异常是
Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced) but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)
谁能帮我处理 Kafka Streams Spring-Kafka 代码示例以处理多个输入和输出主题.
Can anyone help me with Kafka Streams Spring-Kafka code samples for processing with multiple input and output topics.
更新:2021 年 1 月 21 日
在删除所有 kafkaStreams 和 kafkaStreamsTopology bean 配置后,我在无限循环中收到以下消息.消息消费仍然不起作用.我已经使用 @Bean 函数定义检查了 application.yaml 中的订阅.他们对我来说都很好,但我仍然遇到这个交叉接线错误.我已经用上面的application.yaml替换了application.properties
After removing all kafkaStreams and kafkaStreamsTopology beans configuration iam getting below message in an infinite loop. The messages consumption is still not working. I have checked the subscription in application.yaml with the @Bean Function definitions. they all look ok to me but still I get this cross wiring error. I have replaced the application.properties with application.yaml above
[consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
推荐答案
我已经设法解决了这个问题.我写这篇文章是为了他人的利益.如果您想在单个应用程序 jar 中包含多个流,那么关键是定义多个应用程序 ID,每个流一个.我一直都知道这一点,但我不知道如何定义它.最后,答案是我在阅读 SCSt 文档后设法挖掘出来的.下面是如何定义 application.yaml.application.yaml 如下所示
I have managed to solve the problem. I am writing this for the benefit of others. If you want to include multiple streams in your single app jar then the key is in defining multiple application Ids that is one per each of your streams. I knew this all along but I was not aware on how to define it. Finally the answer is something I have managed to dig out after reading the SCSt documentation. Below is how the application.yaml can be defined. application.yaml is like below
spring:
application.name: kafkaMultiStreamConsumer
cloud:
function:
definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
stream:
kafka:
binder:
brokers: 127.0.0.1:9092
min-partition-count: 3
replication-factor: 2
transaction:
transaction-id-prefix: transaction-id-2000
autoCreateTopics: true
auto-add-partitions: true
streams:
binder:
functions:
// needed for functional
processBBB:
application-id: SampleBBBapplication
processAAA:
application-id: SampleAAAapplication
processCCC:
application-id: SampleCCCapplication
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
// Below is for Imperative Style programming using
// the annotation namely @StreamListener, @SendTo in .java class
inputAAA:
destination: aaaInputTopic
outputAAA:
destination: aaaOutputTopic
inputBBB:
destination: bbbInputTopic
outputBBB:
destination: bbbOutputTopic
inputCCC:
destination: cccInputTopic
outputCCC:
destination: cccOutputTopic
// Functional Style programming using Function<KStream...> use either one of them
// as both are not required. If you use both its ok but only one of them works
// from what i have seen @StreamListener is triggered always.
// Below is from functional style
processAAA-in-0:
destination: aaaInputTopic
group: processAAA-group
processAAA-out-0:
destination: aaaOutputTopic
group: processAAA-group
processBBB-in-0:
destination: bbbInputTopic
group: processBBB-group
processBBB-out-0:
destination: bbbOutputTopic
group: processBBB-group
processCCC-in-0:
destination: cccInputTopic
group: processCCC-group
processCCC-out-0:
destination: cccOutputTopic
group: processCCC-group
在上面定义之后,我们现在需要定义实现流处理逻辑的各个 java 类.您的 Java 类可以如下所示.根据您的要求为其他 2 个或 N 个流进行类似的创建.一个例子如下:AAASampleStreamTask.java
Once above is defined we now need to define individual java classes where the Stream processing logic is implemented. Your Java class can be something like below. Create similarly for other 2 or N streams as per your requirement. One example is like below : AAASampleStreamTask.java
@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);
@StreamListener(AAASampleChannel.INPUT)
@SendTo(AAASampleChannel.OUTPUT)
public KStream<String, String> processAAA(KStream<String, String> input) {
input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
return input;
}
/**
* Use above or below. Below style is latest startting from ScSt 3.0 if iam not
* wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have
* both then above gets priority as per my observation
*/
/*
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log.info(
"Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
}
*/
}
如果您想使用命令式编程而不是函数式编程,则需要 Channel.AAASampleChannel.java
The Channel is required if you want to go with Imperative style programming not for functional. AAASampleChannel.java
public interface AAASampleChannel {
String INPUT = "inputAAA";
String OUTPUT = "outputAAA";
@Input(INPUT)
KStream<String, String> inputAAA();
@Output(OUTPUT)
KStream<String, String> outputAAA();
}
这篇关于Kafka Streams:使用 Spring Cloud Stream 为每组主题定义多个 Kafka Streams的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!