Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空 [英] Spring Cloud Stream Kafka Streams Binder KafkaException: Could not start stream: 'listener' cannot be null

查看:23
本文介绍了Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面阅读了有关它的好消息,因此开发人员可以主要关注事物的业务逻辑方面.

I am new to Kafka Streams and Spring Cloud Stream but have read good things about it in terms of moving the integration related codes into properties file so devs can focus mostly on the business logic side of things.

这里有我的简单应用程序类.

Here I have my simple application class.

package com.some.events.consumer

import com.some.events.SomeEvent
import org.apache.kafka.streams.kstream.KStream
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import java.util.function.Consumer

@SpringBootApplication
class ConsumerApplication {
    @Bean
    fun consume(): Consumer<KStream<String, SomeEvent>> {
        return Consumer { input -> input.foreach { key, value -> println("Key: $key, value: $value") } }
    }
}

fun main(args: Array<String>) {
    runApplication<ConsumerApplication>(*args)
}

我的application.yml文件如下.

spring:
  cloud:
    function:
      definition: consume
    stream:
      bindings:
        consume-in-0:
          destination: "some-event"
          group: "some-event"

我在build.gradle.kts中的依赖定义如下(这里只包含相关的)

My dependencies in build.gradle.kts are defined as follows (just included the relevant ones here).

extra["springCloudVersion"] = "2020.0.2"

dependencies {
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-stream")
    implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

运行应用程序时出现以下异常.

When I run the application I got the following exception.

org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:181) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356) ~[spring-context-5.3.5.jar:5.3.5]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:935) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:586) ~[spring-context-5.3.5.jar:5.3.5]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
    at com.some.events.consumer.ConsumerApplicationKt.main(ConsumerApplication.kt:22) ~[main/:na]
Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:94) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178) ~[spring-context-5.3.5.jar:5.3.5]
    ... 14 common frames omitted
Caused by: java.lang.IllegalArgumentException: 'listener' cannot be null
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.5.jar:5.3.5]
    at org.springframework.kafka.config.StreamsBuilderFactoryBean.addListener(StreamsBuilderFactoryBean.java:268) ~[spring-kafka-2.6.7.jar:2.6.7]
    at org.springframework.cloud.stream.binder.kafka.streams.StreamsBuilderFactoryManager.start(StreamsBuilderFactoryManager.java:84) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
    ... 15 common frames omitted


Process finished with exit code 1

请注意,我知道我需要配置 Serde 和 Avro 相关的东西(我将 Avro 用于事件架构),但问题是,流甚至不会运行.

Note that I am aware that I need to configure the Serde and Avro related things (I am using Avro for event schema), but the thing is, the stream won't even run.

有人能指出我正确的方向吗?我尝试在谷歌上搜索此问题,但没有人发布过由侦听器"引起的问题不能为空.谢谢!

Can someone point me in the right direction? I tried googling this but no one has posted an issue where it's caused by 'listener' cannot be null. Thanks!

推荐答案

这是一个错误;它已在 3.1.3-SNAPSHOT 中修复

This is a bug; it is fixed in the 3.1.3-SNAPSHOT

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/f25dbff2b7fc0d0c742dd674a9e392057a34c86d

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087

我不确定那里的评论;将千分尺添加到类路径应该可以解决它.

I am not sure about the comment there; adding micrometer to the class path should resolve it.

这篇关于Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:'listener'不能为空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆