Spring Kafka @SendTo引发异常:需要KafkaTemplate来支持回复 [英] Spring Kafka @SendTo throws exception : a KafkaTemplate is required to support replies

查看:218
本文介绍了Spring Kafka @SendTo引发异常:需要KafkaTemplate来支持回复的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据基于此stackoverflow问题,可能仅通过使用 @SendTo 注释就可以做到这一点,因为spring boot如果上下文中还没有一个模板,也可以自动配置kafka模板."

Based on this stackoverflow question, it should be possible to do this only by using @SendTo annotation beacuse spring boot "also auto configures a kafka template if there is not one already in the context."

但是我无法使它正常工作,我仍然会得到

But I can't get it works, I still get


java.lang.IllegalStateException: a KafkaTemplate is required to support replies
    at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.kafka.config.MethodKafkaListenerEndpoint.createMessageListener(MethodKafkaListenerEndpoint.java:156) 
...

这是我的监听器方法

    @KafkaListener(topics = "t_invoice")
    @SendTo("t_ledger")
    public List<LedgerEntry> consume(Invoice invoice) throws IOException {
        // do some processing

        var ledgerCredit = new LedgerEntry(invoice.getAmount(), "Credit side", 0, "");
        var ledgerDebit = new LedgerEntry(0, "", invoice.getAmount(), "Debit side");

        return List.of(ledgerCredit, ledgerDebit);
    }

我想念什么?

这是我在使用者上拥有的唯一 @Configuration 文件.消费者与生产者是分开的系统(例如,支付系统产生到kafka的发票,我的程序是会计系统,该系统接收数据并创建分类帐分录)

This my the only @Configuration file I have on consumer. Consumer & producer is separated system (e.g. payment system produce invoice to kafka, my program is accounting system that took data and create ledger entry)

@Configuration
public class KafkaConfig {

    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        var properties = kafkaProperties.buildConsumerProperties();
        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "600000");

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

aplication.yml

spring:
  kafka:
    consumer:
      group-id: default-spring-consumer
      auto-offset-reset: earliest

试验错误1

如果我禁用 KafkaConfig ,或在运行期间启用调试,则存在此错误:

If I disable the KafkaConfig, or enable debug during run, this error exists:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.accounting.kafkaconsumer.entity.LedgerEntry to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
Caused by: java.lang.ClassCastException: class com.accounting.kafkaconsumer.entity.LedgerEntry cannot be cast to class java.lang.String (com.accounting.kafkaconsumer.entity.LedgerEntry is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar:na]
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar:na]
...

试验错误2
如果我禁用 KafkaConfig 并使用此签名(返回String),则可以使用.但这是无法预期的,因为我的配置在 KafkaConfig

Trial-Error 2
If I disable KafkaConfig and using this signature (returning String), it works. But this is not expected, since my configuration is on KafkaConfig

    @KafkaListener(topics = "t_invoice")
    @SendTo("t_ledger")
    public String consume(Invoice invoice) throws IOException {
        // do some processing

        var listLedger = List.of(ledgerCredit, ledgerDebit);
        return objectMapper.writeValueAsString(listLedger);
    }

我认为问题出在这里( KafkaConfig ),因为我创建了 KafkaListenerContainerFactory 的新实例,因此 replyTemplate 为null.如何设置我的 KafkaConfig 的正确方法?

I think the problem is in here (KafkaConfig), since I create new instance of KafkaListenerContainerFactory, the replyTemplate is null. How is the correct way to set up my KafkaConfig?

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

推荐答案

如果您覆盖Boot的自动配置的容器工厂,则不会自动配置它,包括应用模板.定义自己的工厂时,您需要对其进行配置.目前尚不清楚为什么要覆盖Boot的 kafkaListenerContainerFactory bean,因为您所做的只是注入消费者工厂.只需删除该 @Bean 并使用引导程序即可.

If you override Boot's auto-configured container factory then it won't be... auto-configured, including applying the template. When you define your own factory, you are responsible for configuring it. It's not clear why you are overriding Boot's kafkaListenerContainerFactory bean since all you are doing is injecting the consumer factory. Just remove that @Bean and use Boot's.

这篇关于Spring Kafka @SendTo引发异常:需要KafkaTemplate来支持回复的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆