Spring Cloud Stream Kafka - 方法必须是声明性的 [英] Spring Cloud Stream Kafka - Method must be Declarative

查看:25
本文介绍了Spring Cloud Stream Kafka - 方法必须是声明性的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用 spring cloud stream 配置了一个基于 spring boot 的应用程序.我正在尝试使用 KStream,但我不断收到错误java.lang.IllegalArgumentException:方法必须是声明性的".有人可以帮助我了解如何进行配置吗?我查看了 StreamListener 文档,但无法使其正常工作.

I have configured a spring boot based application with spring cloud stream. I am trying to work on a KStream but I keep getting error "java.lang.IllegalArgumentException: Method must be declarative". Can someone help me understand how do I get this configured? I looked up the StreamListener documentation but I wasn't able to get it to work.

https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RC2/api/org/springframework/cloud/stream/annotation/StreamListener.html

配置

spring.cloud.stream.kafka.binder.brokers=localhost
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.default.consumer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.concurrency=3
spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=myGroup

应用

/**
 * This works
 */
@StreamListener(Sink.INPUT)
public void process (String event) {

   ...
}

/**
 * This doesn't work
 */
@StreamListener(Sink.INPUT)
public void process (KStream<String, String> event) {

   ...
}

错误

java.lang.IllegalArgumentException: Method must be declarative
    at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:503) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:162) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_172]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at com.abc.xyz.Application.main(Application.java:43) [classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.2.RELEASE.jar:2.1.2.RELEASE]

编辑 1: 添加 Pom.xml

Edit 1: Adding Pom.xml

pom

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    <spring-cloud-stream.version>Fishtown.RELEASE</spring-cloud-stream.version>
</properties>

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

编辑 2:添加@Input 后​​添加更改

Edit 2: Adding changes after adding @Input

@StreamListener
public void process (@Input(Sink.INPUT) KStream<String, String> event) {

    System.out.println(event);
}

错误

java.lang.IllegalStateException: java.lang.ClassCastException: org.springframework.cloud.stream.messaging.DirectWithAttributesChannel cannot be cast to org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapper
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:308) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:164) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_172]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:863) ~[spring-beans-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546) ~[spring-context-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:142) ~[spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.2.RELEASE.jar:2.1.2.RELEASE]
    at abc.xyz.apps.Application.main(Application.java:43) [classes/:?]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.2.RELEASE.jar:2.1.2.RELEASE]
Caused by: java.lang.ClassCastException: org.springframework.cloud.stream.messaging.DirectWithAttributesChannel cannot be cast to org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapper
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.adaptAndRetrieveInboundArguments(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:268) ~[spring-cloud-stream-binder-kafka-streams-2.1.0.RELEASE.jar:2.1.0.RELEASE]
    ... 21 more

推荐答案

正如在您指向的 javadocs 中 Declarative mode 下所说的那样,您需要 @Input 在参数...

As it says under Declarative mode in the javadocs you pointed to, you need @Input on the parameter...

@StreamListener
public void process(@Input(MySink.INPUT) KStream<String, String> event) {

   ...
}

.

interface MySink {

    @Input("input")
    KStream<?, ?> input();

}

这篇关于Spring Cloud Stream Kafka - 方法必须是声明性的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆