在Kafka Connect中使用自定义转换器吗? [英] Using a custom converter with Kafka Connect?
问题描述
我正在尝试将自定义转换器与Kafka Connect配合使用,但似乎无法正确解决.我希望有人对此有经验,可以帮助我解决问题!
I'm trying to use a custom converter with Kafka Connect and I cannot seem to get it right. I'm hoping someone has experience with this and could help me figure it out !
-
我的自定义转换器的类路径为
custom.CustomStringConverter
.
为避免出现任何错误,我的自定义转换器当前只是先前存在的StringConverter的副本/粘贴(当然,当我开始使用它时,它将改变).
to avoid any mistakes, my custom converter is currently just a copy/paste of the pre-existing StringConverter (of course, this will change when I'll get it to work). https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
我有一个由3个节点组成的kafka连接集群,这些节点正在运行汇合的官方docker镜像(confluentinc/cp-kafka-connect:3.3.0
).
I have a kafka connect cluster of 3 nodes, The nodes are running confluent's official docker images (confluentinc/cp-kafka-connect:3.3.0
).
每个节点都配置为在其中装有转换器的jar容器中加载(使用docker卷).
Each node is configured to load a jar with my converter in it (using a docker volume).
当连接器启动时,它们会正确加载罐子并找到自定义转换器.确实,这就是我在日志中看到的内容:
When the connectors start, they correctly load the jars and find the custom converter. Indeed, this is what I see in the logs :
[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
然后我将JSON配置发布到连接器节点之一以创建我的连接器:
I then POST a JSON config to one of the connector nodes to create my connector :
{
"name": "hdfsSinkCustom",
"config": {
"topics": "yellow",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "custom.CustomStringConverter",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
"topics.dir": "yellow_storage",
"flush.size": "1",
"rotate.interval.ms": "1000"
}
}
并收到以下答复:
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}
我想念什么?
如果我尝试运行Kafka Connect stadnalone,则错误消息是相同的.
What am I missing ?
If I try running Kafka Connect stadnalone, the error message is the same.
有人面对过这个吗?我想念什么?
Has anybody faced this already ? What am I missing ?
推荐答案
好吧,感谢Kafka Users邮件列表中的Philip Schmitt,我找到了解决方案.
Ok, I found out the solution thanks to Philip Schmitt on the Kafka Users mailing list.
他提到了这个问题: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007 ,这确实是我面临的问题.
He mentioned this issue: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007 , which is indeed the problem I am facing.
引用他:
要对此进行测试,我只是将SMT jar复制到了我正在使用的连接器的文件夹中,并调整了plugin.path属性.
To test this, I simply copied my SMT jar to the folder of the connector I was using and adjusted the plugin.path property.
实际上,我将转换器放在连接器的文件夹中,从而消除了此错误.
Indeed, I got rid of this error by putting the converter in the connector's folder.
我还尝试了其他方法:创建一个自定义连接器,并将该自定义连接器与自定义转换器一起使用,二者均作为插件加载.也可以.
I also tried something else: create a custom connector and use that custom connector with the custom converter, both loaded as plugins. It also works.
摘要:转换器由连接器加载.如果您的连接器是插件,则您的转换器也应该.如果您的连接器不是插件(与kafka connect distrib捆绑在一起),则转换器也不应该是这样.
Summary: converters are loaded by the connector. If your connector is a plugin, your converter should be as well. If you connector is not a plugin (bundled with your kafka connect distrib), your converter should not be either.
这篇关于在Kafka Connect中使用自定义转换器吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!