Kafka Connect 不适用于主题策略 [英] Kafka Connect not working with Subject Strategies

查看:35
本文介绍了Kafka Connect 不适用于主题策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景

我编写了几个小的 Kafka Connect 连接器.一个每秒生成随机数据,另一个将其记录在控制台中.它们与 Schema Registry 集成,因此数据是使用 Avro 进行序列化.

I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.

我使用 fast-data-dev Docker 镜像将它们部署到本地 Kafka 环境中Landoop

基本设置工作并每秒产生一条记录的消息

The basic setup works and produces a message each second that is logged

但是,我想更改 主题名称策略.默认生成两个主题:

However, I want to change the subject name strategy. The default one generates two subjects:

  • ${topic}-key
  • ${topic}-value

根据我的用例,我需要生成具有不同架构的事件,这些事件最终会涉及同一主题.因此,我需要的主题名称是:

As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

根据文档,我的需求符合 TopicRecordNameStrategy

我尝试了什么

我创建了 avroData 对象来发送值以进行连接:

I create the avroData object for sending values to connect:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

然后使用它来创建 SourceRecord 响应对象

and use it afterwards for creating the SourceRecord response objects

文档 指出,为了使用Kafka Connect 中的架构注册表我必须在连接器配置中设置一些属性.因此,当我创建它时,我添加了它们:

The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性并继续使用旧的 ${topic}-key${topic}-value 主题.

The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.

问题

Kafka Connect 应该支持不同的主题策略.我设法通过编写自己的 AvroConverter 并硬编码主题策略是我需要的.但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题.我复制了主题,所以有一个旧名称的版本 (${topic}-key) 并且它有效

Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works

为 Kafka Connect 指定主题策略的正确设置是什么?

What is the proper setup for specyfing the subject strategy to Kafka Connect?

推荐答案

您缺少 key.convertervalue.converter 前缀,以便配置为传递到转接器.所以,而不是:

You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:

key.subject.name.strategy
value.subject.name.strategy

你想要:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源 https://docs.confluent.io/current/connect/managing/configuring.html:

要将配置参数传递给键和值转换器,请使用 key.converter.value.converter. 作为前缀,就像在定义默认转换器时在工作器配置中一样.请注意,这些仅在 key.convertervalue.converter 属性中指定相应的转换器配置时使用.

To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.

这篇关于Kafka Connect 不适用于主题策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆