Apache Kafka:...StringDeserializer 不是 ...Deserializer 的实例 [英] Apache Kafka: ...StringDeserializer is not an instance of ...Deserializer

查看:58
本文介绍了Apache Kafka:...StringDeserializer 不是 ...Deserializer 的实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的简单应用程序中,我试图实例化一个 KafkaConsumer 我的代码几乎是 javadoc 中的代码(自动偏移提交"):

@Slf4j公共类 MyKafkaConsumer {公共 MyKafkaConsumer() {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer消费者 = 新的 KafkaConsumer<>(props);consumer.subscribe( Arrays.asList("mytopic"));而(真){ConsumerRecords记录 = 消费者.poll(100);for (ConsumerRecord record : 记录)log.info( record.offset() + record.key() + record.value() );//System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}}}

如果我尝试实例化它,我会得到:

org.apache.kafka.common.KafkaException: 构建 kafka 消费者失败在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)在 ...MyKafkaConsumer.(SikomKafkaConsumer.java:23)...引起:org.apache.kafka.common.KafkaException:org.apache.kafka.common.serialization.StringDeserializer 不是 org.apache.kafka.common.serialization.Deserializer 的实例在 org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)在 org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)... 48 更多

如何解决这个问题?

解决方案

这可能是 Kafka 类加载的问题.
将类加载器设置为 null 可能会有所帮助.

<预><代码>...线程 currentThread = Thread.currentThread();ClassLoader savedClassLoader = currentThread.getContextClassLoader();currentThread.setContextClassLoader(null);KafkaConsumer消费者 = 新的 KafkaConsumer<>(props);currentThread.setContextClassLoader(savedClassLoader);...

有完整的解释:
https://stackoverflow.com/a/50981469/1673775

In my simple application i am trying to instantiate a KafkaConsumer my code is nearly a copy of the code from javadoc ("Automatic Offset Committing"):

@Slf4j
public class MyKafkaConsumer {

    public MyKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe( Arrays.asList("mytopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                log.info( record.offset() + record.key() + record.value() );
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

If i try to instantiate this i get:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
    ...
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
        ... 48 more

How to fix this?

解决方案

This might be the problem with Kafka classloading.
Setting classloader to null might help.

...
Thread currentThread = Thread.currentThread();    
ClassLoader savedClassLoader = currentThread.getContextClassLoader();

currentThread.setContextClassLoader(null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

currentThread.setContextClassLoader(savedClassLoader);
...

There is full explanation:
https://stackoverflow.com/a/50981469/1673775

这篇关于Apache Kafka:...StringDeserializer 不是 ...Deserializer 的实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆