在Kafka Connect中使用自定义转换器吗? [英] Using a custom converter with Kafka Connect?

查看:231
本文介绍了在Kafka Connect中使用自定义转换器吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将自定义转换器与Kafka Connect配合使用,但似乎无法正确解决.我希望有人对此有经验,可以帮助我解决问题!

I'm trying to use a custom converter with Kafka Connect and I cannot seem to get it right. I'm hoping someone has experience with this and could help me figure it out !

  • 我的自定义转换器的类路径为custom.CustomStringConverter.

为避免出现任何错误,我的自定义转换器当前只是先前存在的StringConverter的副本/粘贴(当然,当我开始使用它时,它将改变).

to avoid any mistakes, my custom converter is currently just a copy/paste of the pre-existing StringConverter (of course, this will change when I'll get it to work). https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java

我有一个由3个节点组成的kafka连接集群,这些节点正在运行汇合的官方docker镜像(confluentinc/cp-kafka-connect:3.3.0).

I have a kafka connect cluster of 3 nodes, The nodes are running confluent's official docker images (confluentinc/cp-kafka-connect:3.3.0).

每个节点都配置为在其中装有转换器的jar容器中加载(使用docker卷).

Each node is configured to load a jar with my converter in it (using a docker volume).

当连接器启动时,它们会正确加载罐子并找到自定义转换器.确实,这就是我在日志中看到的内容:

When the connectors start, they correctly load the jars and find the custom converter. Indeed, this is what I see in the logs :

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)

然后我将JSON配置发布到连接器节点之一以创建我的连接器:

I then POST a JSON config to one of the connector nodes to create my connector :

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}

并收到以下答复:

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

我想念什么?

如果我尝试运行Kafka Connect stadnalone,则错误消息是相同的.

What am I missing ?

If I try running Kafka Connect stadnalone, the error message is the same.

有人面对过这个吗?我想念什么?

Has anybody faced this already ? What am I missing ?

推荐答案

好吧,感谢Kafka Users邮件列表中的Philip Schmitt,我找到了解决方案.

Ok, I found out the solution thanks to Philip Schmitt on the Kafka Users mailing list.

他提到了这个问题: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007 ,这确实是我面临的问题.

He mentioned this issue: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007 , which is indeed the problem I am facing.

引用他:

要对此进行测试,我只是将SMT jar复制到了我正在使用的连接器的文件夹中,并调整了plugin.path属性.

To test this, I simply copied my SMT jar to the folder of the connector I was using and adjusted the plugin.path property.

实际上,我将转换器放在连接器的文件夹中,从而消除了此错误.

Indeed, I got rid of this error by putting the converter in the connector's folder.

我还尝试了其他方法:创建一个自定义连接器,并将该自定义连接器与自定义转换器一起使用,二者均作为插件加载.也可以.

I also tried something else: create a custom connector and use that custom connector with the custom converter, both loaded as plugins. It also works.

摘要:转换器由连接器加载.如果您的连接器是插件,则您的转换器也应该.如果您的连接器不是插件(与kafka connect distrib捆绑在一起),则转换器也不应该是这样.

Summary: converters are loaded by the connector. If your connector is a plugin, your converter should be as well. If you connector is not a plugin (bundled with your kafka connect distrib), your converter should not be either.

这篇关于在Kafka Connect中使用自定义转换器吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆