创建和使用自定义kafka connect配置提供程序 [英] Creating and using a custom kafka connect configuration provider

查看:463
本文介绍了创建和使用自定义kafka connect配置提供程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经以分布式模式安装并测试了kafka connect,它现在可以工作,并且可以连接到配置的接收器并从配置的源中读取. 在这种情况下,我开始改进自己的安装.我认为需要立即注意的一个方面是创建连接器的事实,唯一可用的方法是通过REST调用,这意味着我需要通过电线发送我的信息,不受保护. 为了确保这一点,卡夫卡在此处. 这很有用,因为它允许在服务器中设置属性,然后在其余调用中引用它们,如下所示:

I have installed and tested kafka connect in distributed mode, it works now and it connects to the configured sink and reads from the configured source. That being the case, I moved to enhance my installation. The one area I think needs immediate attention is the fact that to create a connector, the only available mean is through REST calls, this means I need to send my information through the wire, unprotected. In order to secure this, kafka introduced the new ConfigProvider seen here. This is helpful as it allows to set properties in the server and then reference them in the rest call, like so:

{
.
.
"property":"${file:/path/to/file:nameOfThePropertyInFile}"
.
.
}

这非常有效,只需在服务器上添加属性文件,然后在distributed.properties文件上添加以下配置:

This works really well, just by adding the property file on the server and adding the following config on the distributed.properties file:

config.providers=file   # multiple comma-separated provider types can be specified here
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

尽管此解决方案有效,但实际上并不能缓解我对安全性的担忧,因为信息现在已经通过有线方式传递,现在已存储在存储库中,文本清晰可见,每个人都可以看到. kafka团队预见到了此问题,并允许客户端生成实现ConfigProvider接口的自己的配置提供程序. 我创建了自己的实现并将其打包在一个jar中,并给它起了最后的名字:

While this solution works, it really does not help to easy my concerns regarding security, as the information now passed from being sent over the wire, to now be seating on a repository, with text on plain sight for everyone to see. The kafka team foresaw this issue and allowed clients to produce their own configuration providers implementing the interface ConfigProvider. I have created my own implementation and packaged in a jar, givin it the sugested final name:

META-INF/services/org.apache.kafka.common.config.ConfigProvider

并在分布式文件中添加了以下条目:

and added the following entry in the distributed file:

config.providers=cust
config.providers.cust.class=com.somename.configproviders.CustConfigProvider

但是我从连接中得到一个错误,指出一个实现ConfigProvider的类,名称为:

However I am getting an error from connect, stating that a class implementing ConfigProvider, with the name:

com.somename.configproviders.CustConfigProvider

找不到

. 我现在不知所措,因为他们站点上的文档并未明确说明如何很好地配置自定义配置提供程序.

could not be found. I am at a loss now, because the documentation on their site is not explicit about how to configure custom config providers very well.

有人在处理类似问题并且可以对此提供一些见识吗?任何帮助将不胜感激.

Has someone worked on a similar issue and could provide some insight into this? Any help would be appreciated.

推荐答案

我最近通过这些步骤来设置自定义ConfigProvider.官方文档含糊不清,令人困惑.

I just went through these to setup a custom ConfigProvider recently. The official doc is ambiguous and confusing.

我创建了自己的实现并将其打包在一个jar中,并给它起了最后的名字: META-INF/services/org.apache.kafka.common.config.ConfigProvider

I have created my own implementation and packaged in a jar, givin it the sugested final name: META-INF/services/org.apache.kafka.common.config.ConfigProvider

您可以随意命名jar的最终名称,但是需要打包为后缀为.jar的jar格式.

You could name the final name of jar whatever you like, but needs to pack to jar format which has .jar suffix.

这是完整的步骤.假设您的自定义ConfigProvider完全限​​定名称为com.my.CustomConfigProvider.MyClass. 1.在目录下创建文件:META-INF/services/org.apache.kafka.common.config.ConfigProvider.文件内容是完整的合格类名称: com.my.CustomConfigProvider.MyClass

Here is the complete step by step. Suppose your custom ConfigProvider fully-qualified name is com.my.CustomConfigProvider.MyClass. 1. create a file under directory: META-INF/services/org.apache.kafka.common.config.ConfigProvider. File content is full qualified class name: com.my.CustomConfigProvider.MyClass

  1. 包括您的源代码,并在META-INF文件夹上方以生成Jar包.如果您使用的是Maven,则文件结构看起来像

将您的最终Jar文件(例如custom-config-provider-1.0.jar)放在Kafka worker插件文件夹下.默认值为/usr/share/java. Kafka worker配置文件中的PLUGIN_PATH.

put your final Jar file, say custom-config-provider-1.0.jar, under the Kafka worker plugin folder. Default is /usr/share/java. PLUGIN_PATH in Kafka worker config file.

也将所有依赖项jar都上传到PLUGIN_PATH.使用jar文件中的META-INFO/MANIFEST.MF文件来配置代码将使用的依赖jar的"ClassPath".

Upload all the dependency jars to PLUGIN_PATH as well. Use the META-INFO/MANIFEST.MF file inside your Jar file to configure the 'ClassPath' of dependent jars that your code will use.

在kafka worker配置文件中,创建两个附加属性:

In kafka worker config file, create two additional properties:

CONNECT_CONFIG_PROVIDERS: 'mycustom', // Alias name of your ConfigProvider
CONNECT_CONFIG_PROVIDERS_MYCUSTOM_CLASS:'com.my.CustomConfigProvider.MyClass',

  1. 重新启动工作人员

  1. Restart workers

通过将POST卷曲到Kafka Restful API来更新连接器配置文件.在连接器配置文件中,可以使用以下语法引用从ConfigProvider:get(path, keys)返回的ConfigData内部的值:

Update your connector config file by curling POST to Kafka Restful API. In Connector config file, you could reference the value inside ConfigData returned from ConfigProvider:get(path, keys) by using the syntax like:

database.password=${mycustom:/path/pass/to/get/method:password}

ConfigData是一个包含{password:123}

ConfigData is a HashMap which contains {password: 123}

  1. 如果仍然看到ClassNotFound异常,则可能是您的ClassPath设置不正确.

注意: •如果您使用的是AWS ECS/EC2,则需要通过设置环境变量来设置工作程序配置文件. •worker配置和连接器配置文件不同.

Note: • If you are using AWS ECS/EC2, you need to set the worker config file by setting the environment variable. • worker config and connector config file are different.

这篇关于创建和使用自定义kafka connect配置提供程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆