Spring Cloud Streams 应用程序抛出嵌套异常是域模型类的 java.lang.ClassCastException [英] Spring Cloud Streams application throwing nested exception is java.lang.ClassCastException for a domain model class

本文介绍了Spring Cloud Streams 应用程序抛出嵌套异常是域模型类的 java.lang.ClassCastException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为 Kafka 试用最新的 Spring 云流框架.但是,对于 String 和 Double 其工作正常,但是当我尝试发送 Java POJO 类时,它会引发以下异常.

Hi I am trying out the latest Spring cloud stream framework for Kafka. However, for String and Double its working fine but when I try to send a Java POJO class, it throws the below exception.

我尝试了各种用于序列化和反序列化的配置,但似乎没有任何效果.我能够以 json 格式从供应商处生成消息,但由于错误,消费者无法处理它.

I have tried various configuration for serialization and deserialization but nothing seems to be working. I am able to produce the message from the Supplier, as json but the consumers are not able to process it due to the error.

对此问题的任何建议将不胜感激.谢谢

Any suggestions for the issue would be appreciated. Thanks

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9), failedMessage=GenericMessage [payload=byte[72], headers={kafka_offset=804, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@24ee2e4c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=random-topic-2, kafka_receivedTimestamp=1624778145945, kafka_groupId=mypublish-reader-group, target-protocol=kafka}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2371) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2339) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2300) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2214) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2021) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1703) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398) ~[spring-integration-kafka-5.5.1.jar!/:5.5.1]

下面是POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sample</groupId>
    <artifactId>message</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>message</name>
    <description>Spring messaging sample</description>
    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2020.0.3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

下面是应用程序

@Bean
public Supplier<Grade> random() {
    return () -> {
        System.out.println("=========In Supplier --------------------");
        return new Grade(UUID.randomUUID().toString(),Math.random()*100);
    };
}

@Bean
public Function<Grade, String> publish() {
    System.out.println("@@@@@@@@@@@@@@In Function user--------------------");
    return user -> "User:: " + user.toString();
}

@Bean
public Consumer<String> log() {
    System.out.println("++++++++++++In log --------------------");
    return s -> {
        System.out.println("Received>> " + s);
    };
}

应用程序.yml

spring:
  application:
    name: kafka-messaging
  json:
    value:
      default:
        type: com.msg.Grade
    trusted:
      packages: com.msg

  cloud:
    function:
      definition: random;publish;log
    stream:
      bindings:
        random-out-0:
          destination: random-topic-2

        publish-in-0:
          destination: random-topic-2
          group: mypublish-reader-group
        publish-out-0:
          destination: message-topic-2

        log-in-0:
          destination: message-topic-2
          group: message-reader-group

      kafka:
        binder:
          brokers: localhost:9092

推荐答案

我终于发现了这个问题.感觉非常愚蠢,因为问题在于 Model 类中没有默认的无参数构造函数.添加无参数构造函数后,一切正常.

Finally I found out the issue. Feel extremely stupid because the issue was that there was no default no-arg constructor in the Model class. After adding the no-arg constructor, all works as expected.

这篇关于Spring Cloud Streams 应用程序抛出嵌套异常是域模型类的 java.lang.ClassCastException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆