Karaf - Kafka OSGI 包 - 生产者问题 [英] Karaf - Kafka OSGI bundle - Producer issue

查看:31
本文介绍了Karaf - Kafka OSGI 包 - 生产者问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 apache Karaf 4.0.3 版中为 Kafka 制作人创建一个简单的包.

I am trying to create a simple bundle for Kafka producer in apache Karaf version 4.0.3 .

这是我的Java代码

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner");
Producer<String, String> producer = new KafkaProducer<String,String>(props,new StringSerializer(),new StringSerializer());
//for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("test","data", outputData));

producer.close();

我已经在 pom.xml 中明确声明了各自的依赖

I have clearly declared the respective dependency in pom.xml

<dependency>
        <groupId>org.apache.servicemix.bundles</groupId>
        <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
        <version>0.9.0.0_1</version>
</dependency>

我也部署了那个 kafka 客户端包.

I have deployed that kafka client bundle too.

但是在启动生产者时,我在第一次尝试时看到以下异常.

but on starting the producer i see below exception on first Attempt .

Exception in thread "pool-135-thread-1" java.lang.ExceptionInInitializerError
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
    .
    .
    .
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.clients.producer.internals.DefaultPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.DefaultPartitioner could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:78)
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:94)
    at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:206)
    ... 12 more

然后连续这个...

Exception in thread "pool-136-thread-1" java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194)
.
.
.
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745

有没有人遇到过类似的问题?

Has anyone encoutered similar issue with the bundle ??

推荐答案

我在 0.9.0 中看到了同样的问题.事实证明,设置了一个线程上下文加载器,在这种情况下,Kafka 使用该类加载器来解析.所以线程上下文类加载器应该是:

I saw this same problem in 0.9.0. It turned out that a Thread context loader was set, and in that case Kafka uses that classloader to resolve. So the thread context classloader should either be:

  • 一个可以解析所有 Kafka 相关内容的类加载器
  • null

不知道这是否会咬我,但补充:

Don't know if this is going to bite me, but adding:

Thread.currentThread().setContextClassLoader(null);

成功了.

这篇关于Karaf - Kafka OSGI 包 - 生产者问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆