Apache Kafka:... StringDeserializer不是... Deserializer的实例 [英] Apache Kafka: ...StringDeserializer is not an instance of ...Deserializer
本文介绍了Apache Kafka:... StringDeserializer不是... Deserializer的实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在我的简单应用程序中,我试图实例化KafkaConsumer,我的代码几乎是
In my simple application i am trying to instantiate a KafkaConsumer my code is nearly a copy of the code from javadoc ("Automatic Offset Committing"):
@Slf4j
public class MyKafkaConsumer {
public MyKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe( Arrays.asList("mytopic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
log.info( record.offset() + record.key() + record.value() );
//System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
如果我尝试实例化它,我将得到:
If i try to instantiate this i get:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
...
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
... 48 more
该如何解决?
推荐答案
这可能是Kafka类加载的问题.
将classloader设置为null
可能会有所帮助.
This might be the problem with Kafka classloading.
Setting classloader to null
might help.
...
Thread currentThread = Thread.currentThread();
ClassLoader savedClassLoader = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
currentThread.setContextClassLoader(savedClassLoader);
...
有完整的解释:
https://stackoverflow.com/a/50981469/1673775
There is full explanation:
https://stackoverflow.com/a/50981469/1673775
这篇关于Apache Kafka:... StringDeserializer不是... Deserializer的实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文