为 Confluent 4.1 + Kafka 1.1 中的 Kafka Connect 打包自定义 Java `partitioner.class` 插件? [英] Packaging a custom Java `partitioner.class` plugin for Kafka Connect in Confluent 4.1 + Kafka 1.1?

查看:27
本文介绍了为 Confluent 4.1 + Kafka 1.1 中的 Kafka Connect 打包自定义 Java `partitioner.class` 插件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经成功地使用了一个用 Java 编写的简单自定义 Partitioner 类,用于 Confluent 3.2.x (Kafka 0.10.x) 上的 Kafka Connect 接收器.我想升级到 Confluent 4.1 (Kafka 1.1) 并且遇到错误.

I've been successfully using a simple custom Partitioner class written in Java for a Kafka Connect sink on Confluent 3.2.x (Kafka 0.10.x). I want to upgrade to Confluent 4.1 (Kafka 1.1) and am experiencing errors.

Kafka Connect 的插件加载机制似乎在 CP 3.3.0 中发生了变化.以前,只有 CLASSPATH 选项,但在 CP 3.3.0+ 中,有更新的推荐 plugin.path 机制.

Kafka Connect's plugin loading mechanism seems to have been changed in CP 3.3.0. Previously, there was just the CLASSPATH option, but with CP 3.3.0+ there is a newer and recommended plugin.path mechanism.

如果我尝试继续使用旧的 CLASSPATH 插件机制,当我尝试使用我的插件时,我得到:

If I try to keep using the legacy CLASSPATH plugin mechanism, when I try to use my plugin, I get:

java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.DefaultPartitioner

那是一个CP内部类.对于较旧的 CP 3.2.x,可在类路径上使用,但是随着 CP >= 3.3.0 中新的类路径隔离工作,我认为必须与插件一起提供.

That is a CP internal class. With the older CP 3.2.x, that was available on the classpath, however with the new classpath isolation efforts in CP >= 3.3.0, I presume that must be provided along with the plugin.

我认为切换到推荐的更新的 plugin.path 机制是明智的.我删除了 CLASSPATH 条目.在默认的 /etc/kafka/connect-distributed.properties 中,我看到了 plugin.path=/usr/share/java,所以我将我的插件 .jar 安装到 <代码>/usr/share/java/my-custom-partitioner/my-custom-partitioner.jar.我也尝试在那里添加和不添加依赖项 .jar 文件.

I figure it's wise to switch to the newer recommended plugin.path mechanism. I remove the CLASSPATH entry. In the default /etc/kafka/connect-distributed.properties, I see plugin.path=/usr/share/java, so I install my plugin .jar to /usr/share/java/my-custom-partitioner/my-custom-partitioner.jar. I have tried adding and not adding dependency .jar files there as well.

我的插件似乎在 Kafka Connect 服务启动时加载了:

My plugin seems to get loaded when the Kafka Connect service starts up:

INFO Loading plugin from: /usr/share/java/my-custom-partitioner (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:194)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/my-custom-partitioner/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:217)

当我这样做时:

curl -X PUT -H "Content-Type: application/json" --data-binary "@sink_test_1.json" my-dev-test-vm:8083/connectors/sink-test-1/config

我得到:

{"error_code":500,"message":null}%             

我可以在 kafka 连接 systemd 日志中看到:

I can see in the kafka connect systemd log:

java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:270)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:33)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(PartitionerConfig.java:238)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:617)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:625)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:525)
at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:508)
at org.apache.kafka.common.config.ConfigDef.validate(ConfigDef.java:490)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)

不清楚出了什么问题或为什么我的分区程序类没有正确加载.

It's not apparent what is going wrong or why my partitioner class isn't loading correctly.

仅供参考,我已经使用 CP 4.1 + Kafka 1.1 依赖项重建了我的 Java 插件,并进行了小的更新以匹配 API 更改,例如将 getSchemaGeneratorClass 的实现添加到我的分区程序类.

FYI, I have rebuilt my Java plugin with CP 4.1 + Kafka 1.1 dependencies and made small updates to match API changes such as adding an implementation for getSchemaGeneratorClass to my partitioner class.

推荐答案

自定义 Kafka Connect Partitioner 类将无法通过旧的 CLASSPATH 机制工作,并且它们将无法作为具有较新的 Kafka 0.11.0+ 独立插件机制的插件工作.

Custom Kafka Connect Partitioner classes will not work via the old CLASSPATH mechanism and they will not work as plugins with the newer Kafka 0.11.0+ isolated plugin mechanism.

唯一可行的解​​决方案是将带有自定义 Kafka Connect Partitioner 类的自定义 .jar 文件复制到 /usr/share/java 的 kafka-connect-storage-common 插件目录中/kafka-connect-storage-common/.自定义 Kafka Connect Partitioner 插件类必须存在于同一目录中,以便它们位于同一个独立的类加载器中.

The only working solution is to copy your custom .jar file with your custom Kafka Connect Partitioner class into the kafka-connect-storage-common plugin directorry at /usr/share/java/kafka-connect-storage-common/. Custom Kafka Connect Partitioner plugin classes must exist in that same directory so they are in the same isolated class loader.

仅供参考,您可以看到 Kafka 0.11.0+ 独立插件机制将仅加载四个特定 Java 类的子类,这些类不包括此处的 Kafka Connect 分区器:

FYI, you can see that the Kafka 0.11.0+ isolated plugin mechanism will only load subclasses of four specific Java classes that do not cover the Kafka Connect partitioners here:

https://github.com/apache/kafka/blob/fdcf75ea326b8e07d8d7c4f5cc14f1e70451bcd4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ClassLoader.java#L279

感谢 cricket_007 推荐这个确切的解决方案:将自定义 Kafka Connect 分区程序 .jar 文件放在 /share/java/kafka-storage-common 目录中.我通过艰难的方式了解到为什么必须这样做以及为什么替代方案不起作用.

Thanks to cricket_007 for recommending this exact solution: putting custom Kafka Connect partitioner .jar files in the /share/java/kafka-storage-common directory. I learned the hard way exactly why this has to be done and why alternatives don't work.

这篇关于为 Confluent 4.1 + Kafka 1.1 中的 Kafka Connect 打包自定义 Java `partitioner.class` 插件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆