spring-cloud-stream 消息转换异常 [英] spring-cloud-stream message conversion exception

查看:30
本文介绍了spring-cloud-stream 消息转换异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在将我们的一项服务升级到 spring-cloud-stream 2.0.0.RC3 时,我们在尝试使用由使用旧版本 spring-cloud-stream - Ditmars.RELEASE:

<块引用>

错误 31241 --- [container-4-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.converter.MessageConversionException:无法从 [[B] 转换为 [com.watercorp.messaging.types].incoming.UsersDeletedMessage] 用于 GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_176451d40d=CREATE_17645150022e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=15216421677sMessage headerskafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e947de-22e947de-22e947de-22e947de-29e6-29dtype_dkafka_timestampType=CREATE_TIME, 类型=29e6-29fkafkafkafka-29e6-2262261dkafka_timestampType=c29e6-2262d1dkafka_timestampType=c29e6-b62d文本/普通,kafka_receivedTopic=用户,kafka_receivedTimestamp=1521641760698,时间戳=1521641772477}]在 org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)在 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)在 org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)在 org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164)在 org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157)在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)在 org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在 org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)在 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)在 java.util.concurrent.FutureTask.run(FutureTask.java:266)在 java.lang.Thread.run(Thread.java:745)

看起来原因是与消息一起发送的 contentType 标头是 text/plain 虽然它应该是 application/json.
生产者配置:<前>弹簧:云:流:卡夫卡:粘合剂:经纪人:kafka默认代理端口:9092zkNodes:动物园管理员默认ZkPort:2181最小分区数:2复制因子:1自动创建主题:true自动添加分区:真标头:类型,message_idrequiredAcks:1配置:[security.protocol]":PLAINTEXT #TODO:这是一种解决方法.应该是 security.protocol绑定:用户输出:制作人:同步:真配置:重试:10000默认:粘合剂:卡夫卡内容类型:应用程序/json组:用户服务消费者:最大尝试:1制作人:partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor绑定:用户输出:目的地:用户制作人:分区数:5

消费者配置:

<前>弹簧:云:流:卡夫卡:粘合剂:经纪人:kafka默认代理端口:9092最小分区数:2复制因子:1自动创建主题:true自动添加分区:真标头:类型,message_idrequiredAcks:1配置:[security.protocol]":PLAINTEXT #TODO:这是一种解决方法.应该是 security.protocol绑定:用户输入:消费者:启用自动重新平衡:真autoCommitOnError: 真enableDlq: 真默认:粘合剂:卡夫卡内容类型:应用程序/json组:注册服务消费者:最大尝试:1headerMode:embeddedHeaders制作人:partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractorheaderMode:embeddedHeaders绑定:用户输入:目的地:用户消费者:并发:5分区:真

消费者@StreamListener:

<前>@StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'")public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId",required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable {logger.info(String.format("收到用户删除的消息消息,消息ID:%s 主题:%s 分区:%s", messageId, topic, partitionId));handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic);}

解决方案

这是 RC3 中的一个错误;最近在 master 上修复;它将在本月底到期的 GA 版本中发布.同时,您可以尝试使用 2.0.0.BUILD-SNAPSHOT 吗?

我能够重现该问题并使用快照为我修复了它...

 <依赖><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId><version>2.0.0.BUILD-SNAPSHOT</version></依赖><依赖><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId><version>2.0.0.BUILD-SNAPSHOT</version><排除事项><排除><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka-core</artifactId></排除></排除项></依赖><依赖><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka-core</artifactId><version>2.0.0.BUILD-SNAPSHOT</version></依赖>

编辑

为了完整性:

Ditmars 制作人

@SpringBootApplication@EnableBinding(Source.class)公共类 So49409104Application {公共静态无效主(字符串 [] args){SpringApplication.run(So49409104Application.class, args);}@豆公共 ApplicationRunner 运行程序(MessageChannel 输出){返回参数 ->{foo foo = new foo();foo.setBar("bar");output.send(new GenericMessage<>(foo));};}公共静态类 Foo {私人字符串栏;公共字符串 getBar() {返回 this.bar;}public void setBar(String bar) {this.bar = 酒吧;}@覆盖公共字符串 toString() {return "Foo [bar=" + this.bar + "]";}}}

弹簧:云:流:绑定:输出:目的地:so49409104a内容类型:应用程序/json制作人:标头模式:embeddedHeaders

埃尔姆赫斯特消费者:

@SpringBootApplication@EnableBinding(Sink.class)公共类 So494091041Application {公共静态无效主(字符串 [] args){SpringApplication.run(So494091041Application.class, args);}@StreamListener(Sink.INPUT)公共无效听(Foo foo){System.out.println(foo);}公共静态类 Foo {私人字符串栏;公共字符串 getBar() {返回 this.bar;}public void setBar(String bar) {this.bar = 酒吧;}@覆盖公共字符串 toString() {return "Foo [bar=" + this.bar + "]";}}}

弹簧:云:流:绑定:输入:群:so49409104目的地:so49409104a消费者:标头模式:embeddedHeaders内容类型:应用程序/json

结果:

Foo [bar=bar]

header-mode 是必需的,因为 2.0 中的默认值是 native,现在 Kafka 支持本地头.

While upgrading one of our services to spring-cloud-stream 2.0.0.RC3 we encountered an exception when trying to consume a message that was produced by a service that uses older version of spring-cloud-stream - Ditmars.RELEASE:

ERROR 31241 --- [container-4-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.watercorp.messaging.types.incoming.UsersDeletedMessage] for GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=1521641760698, timestamp=1521641772477}], failedMessage=GenericMessage [payload=byte[371], headers={kafka_offset=1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d, kafka_timestampType=CREATE_TIME, message_id=1645508761, id=f4e947de-22e6-b629-229b-4fa961c73f2d, type=USERS_DELETED, kafka_receivedPartitionId=4, contentType=text/plain, kafka_receivedTopic=user, kafka_receivedTimestamp=1521641760698, timestamp=1521641772477}] at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745)

It looks like that the cause is that the contentType header that is sent with the message is text/plain although it should be application/json.
The producer configuration:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: user-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor
        bindings:
          user-output:
            destination: user
            producer:
              partitionCount: 5

Consumer configuration:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true            
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
            headerMode: embeddedHeaders
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
            headerMode: embeddedHeaders
        bindings:          
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true          

Consumer @StreamListener:

    @StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'")
    public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId",
            required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable {
        logger.info(String.format("Received users deleted message message, message id: %s topic: %s partition: %s", messageId, topic, partitionId));
        handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic);
    }

解决方案

This is a bug in RC3; recently fixed on master; it will be in the GA release due at the end of the month. In the meantime, can you try with 2.0.0.BUILD-SNAPSHOT?

I was able to reproduce the problem and using the snapshot fixed it for me...

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
        <exclusions>
            <exclusion>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
        <version>2.0.0.BUILD-SNAPSHOT</version>
    </dependency>

EDIT

For completeness:

Ditmars producer

@SpringBootApplication
@EnableBinding(Source.class)
public class So49409104Application {

    public static void main(String[] args) {
        SpringApplication.run(So49409104Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            Foo foo = new Foo();
            foo.setBar("bar");
            output.send(new GenericMessage<>(foo));
        };
    }


    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

and

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: so49409104a
          content-type: application/json
          producer:
            header-mode: embeddedHeaders

Elmhurst consumer:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So494091041Application {

    public static void main(String[] args) {
        SpringApplication.run(So494091041Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(Foo foo) {
        System.out.println(foo);
    }

    public static class Foo {

        private String bar;

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

and

spring:
  cloud:
    stream:
      bindings:
        input:
          group: so49409104
          destination: so49409104a
          consumer:
            header-mode: embeddedHeaders
          content-type: application/json

Result:

Foo [bar=bar]

The header-mode is required since the default in 2.0 is native now that Kafka supports headers natively.

这篇关于spring-cloud-stream 消息转换异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆