Kafka 消费者的 ClassNotFoundException [英] ClassNotFoundException with Kafka consumer

查看:87
本文介绍了Kafka 消费者的 ClassNotFoundException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 Spring Boot 2.0.2 编写的 Kafka 消费者应用程序.当我在侦听器中收到消息时,出现以下错误:

I have a Kafka consumer application, written with Spring Boot 2.0.2. When I receive the message in my listener, I get the following error:

Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.test.demo.domain.Account]; nested exception is java.lang.ClassNotFoundException: com.test.demo.domain.Account

Producer 中对象的类名是com.test.demo.domain.Account",但我在 Consumer 中有不同的包和类名.

The class name of the object in the Producer is "com.test.demo.domain.Account" but I have a different package and class name in the consumer.

当我重新打包消费者的类名以匹配生产者时,一切正常.但是,我认为我不应该这样做.

When I re-package the consumer's class name to match the producers, everything works ok. However, I believe I shouldn't have to do this.

有人知道这个问题吗?

==== 更新 ====

我的生产者代码:

@Bean public ProducerFactory<String, Account> accountProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
    return new DefaultKafkaProducerFactory<>(configProps); }

@Bean public KafkaTemplate<String, Account> accountKafkaTemplate() {
    ProducerFactory<String, Account> factory = accountProducerFactory();

    return new KafkaTemplate<>(factory); }

消费者代码:

public ConsumerFactory<String, Account> accountConsumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
    return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Account> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(accountConsumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}

例外:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.kconsumer.accountconsumer.service.AccountConsumer.accountListener(com.kconsumer.accountconsumer.domain.Account)]
Bean [com.kconsumer.accountconsumer.service.AccountConsumer@444cc791]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.kconsumer.accountconsumer.domain.Account] for GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}], failedMessage=GenericMessage [payload={"id":"5b079d0b340d9ef2ac9b6f02","name":"test-400","version":0}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@6a0a6b0b, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=5b079d0b340d9ef2ac9b6f02, kafka_receivedPartitionId=0, kafka_receivedTopic=ktest, kafka_receivedTimestamp=1527225611820, __TypeId__=[B@2f92e17a}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:257)

推荐答案

如果您使用 @KafkaListener,请使用 StringDeserializerByteArrayDserializer> 并将 StringJsonMessageConverter @Bean 添加到应用程序上下文中.

If you are using @KafkaListener, use the StringDeserializer or a ByteArrayDserializer and add a StringJsonMessageConverter @Bean to the application context.

那么……

@KafkaListener(...)
public void listen(Account account) {
    ...
}

...所需的帐户类型被传递给转换器.

...the required type of the account is passed to the converter.

文档.

编辑

您不需要连接工厂,boot 会检测转换器并为您连接.

You don't need a connection factory, boot will detect the converter and wire it in for you.

这是一个例子:

@SpringBootApplication
public class So50478267Application {

    public static void main(String[] args) {
        SpringApplication.run(So50478267Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Account1> template) {
        return args -> template.send("so50478267", new Account1("foo.inc"));
    }

    @KafkaListener(id = "listen", topics = "so50478267")
    public void listen(Account2 account) {
        System.out.println(account);
    }

    @Bean
    public StringJsonMessageConverter jsonConverter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so50478267", 1, (short) 1);
    }

    public static class Account1 {

        private final String customer;

        public Account1(String customer) {
            this.customer = customer;
        }

        public String getCustomer() {
            return this.customer;
        }

    }

    public static class Account2 {

        private String customer;

        public Account2() {
            super();
        }

        public String getCustomer() {
            return this.customer;
        }

        public void setCustomer(String customer) {
            this.customer = customer;
        }

        @Override
        public String toString() {
            return "Account2 [customer=" + this.customer + "]";
        }

    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

这篇关于Kafka 消费者的 ClassNotFoundException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆