Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空 [英] Spring Cloud Stream Kafka Streams Binder KafkaException: Could not start stream: 'listener' cannot be null
问题描述
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了有关它的好消息,因此开发人员可以主要关注事物的业务逻辑方面.
I am new to Kafka Streams and Spring Cloud Stream but have read good things about it in terms of moving the integration related codes into properties file so devs can focus mostly on the business logic side of things.
这里有我的简单应用程序类.
Here I have my simple application class.
package com.some.events.consumer
import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer
@SpringBootApplication
class ConsumerApplication {
@Bean
fun consume(): Consumer<KStream<String, SomeEvent>> {
return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
}
}
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
我的application.yml
文件如下.
spring:
cloud:
function:
definition: consume
stream:
bindings:
consume-in-0:
destination: "some-event"
group: "some-event"
我在build.gradle.kts
中的依赖定义如下(这里只包含相关的)
My dependencies in build.gradle.kts
are defined as follows (just included the relevant ones here).
extra["springCloudVersion"] = "2020.0.2"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-stream")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
运行应用程序时出现以下异常.
When I run the application I got the following exception.
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
... 15 common frames omitted
Process finished with exit code 1
请注意,我知道我需要配置 Serde 和 Avro 相关的东西(我将 Avro 用于事件架构),但问题是,流甚至不会运行.
Note that I am aware that I need to configure the Serde and Avro related things (I am using Avro for event schema), but the thing is, the stream won't even run.
有人能指出我正确的方向吗?我尝试在谷歌上搜索此问题,但没有人发布过由侦听器"引起的问题不能为空.谢谢!
Can someone point me in the right direction? I tried googling this but no one has posted an issue where it's caused by 'listener' cannot be null. Thanks!
推荐答案
这是一个错误;它已在 3.1.3-SNAPSHOT 中修复
This is a bug; it is fixed in the 3.1.3-SNAPSHOT
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
我不确定那里的评论;将千分尺添加到类路径应该可以解决它.
I am not sure about the comment there; adding micrometer to the class path should resolve it.
这篇关于Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!