Kafka Connect 不适用于主题策略 [英] Kafka Connect not working with Subject Strategies
问题描述
背景
我编写了几个小的 Kafka Connect 连接器.一个每秒生成随机数据,另一个将其记录在控制台中.它们与 Schema Registry 集成,因此数据是使用 Avro 进行序列化.
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
我使用 fast-data-dev Docker 镜像将它们部署到本地 Kafka 环境中Landoop
基本设置工作并每秒产生一条记录的消息
The basic setup works and produces a message each second that is logged
但是,我想更改 主题名称策略.默认生成两个主题:
However, I want to change the subject name strategy. The default one generates two subjects:
${topic}-key
${topic}-value
根据我的用例,我需要生成具有不同架构的事件,这些事件最终会涉及同一主题.因此,我需要的主题名称是:
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
根据文档,我的需求符合 TopicRecordNameStrategy
我尝试了什么
我创建了 avroData
对象来发送值以进行连接:
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后使用它来创建 SourceRecord
响应对象
and use it afterwards for creating the SourceRecord
response objects
文档 指出,为了使用Kafka Connect 中的架构注册表我必须在连接器配置中设置一些属性.因此,当我创建它时,我添加了它们:
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key
和 ${topic}-value
主题.
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
问题
Kafka Connect 应该支持不同的主题策略.我设法通过编写自己的 AvroConverter
并硬编码主题策略是我需要的.但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题.我复制了主题,所以有一个旧名称的版本 (${topic}-key
) 并且它有效
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
为 Kafka Connect 指定主题策略的正确设置是什么?
What is the proper setup for specyfing the subject strategy to Kafka Connect?
推荐答案
您缺少 key.converter
和 value.converter
前缀,以便配置为传递到转接器.所以,而不是:
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
你想要:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
来源 https://docs.confluent.io/current/connect/managing/configuring.html:
要将配置参数传递给键和值转换器,请使用 key.converter.
或 value.converter.
作为前缀,就像在定义默认转换器时在工作器配置中一样.请注意,这些仅在 key.converter
或 value.converter
属性中指定相应的转换器配置时使用.
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
这篇关于Kafka Connect 不适用于主题策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!