Kafka Connect无法使用主题策略 [英] Kafka Connect not working with Subject Strategies

查看:98
本文介绍了Kafka Connect无法使用主题策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文

我编码了几个小的 Kafka Connect 连接器.一个每秒仅生成随机数据,另一个每秒将其记录在控制台中.它们与架构注册表集成在一起,因此数据是用 Avro 序列化.

I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.

我使用 fast-data-dev Docker映像将它们部署到本地Kafka环境中Landoop

基本设置有效并每秒产生一条消息,记录下来

The basic setup works and produces a message each second that is logged

但是,我想更改

However, I want to change the subject name strategy. The default one generates two subjects:

  • $ {topic}-键
  • $ {topic} -value

根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上.因此,我需要的主题名称是:

As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:

  • $ {topic}-$ {keyRecordName}
  • $ {topic}-$ {valueRecordName}

根据文档,我的需求符合 我尝试了什么

我创建了 avroData 对象,用于发送值以进行连接:

I create the avroData object for sending values to connect:

class SampleSourceConnectorTask : SourceTask() {

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) {
        [...]
        avroData = AvroData(AvroDataConfig(props))
    }

,然后将其用于创建 SourceRecord 响应对象

and use it afterwards for creating the SourceRecord response objects

文档指出,为了使用Kafka Connect中的架构注册表我必须在连接器配置中设置一些属性.因此,当我创建它时,请添加它们:

The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性,并继续使用旧的 $ {topic} -key $ {topic} -value 主题.

The connector seems to ignore those properties and keeps using the old ${topic}-key and ${topic}-value subjects.

问题

Kafka Connect应该支持不同的主题策略.我通过编写自己的

Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key) and it works

将主题策略指定给Kafka Connect的正确设置是什么?

What is the proper setup for specyfing the subject strategy to Kafka Connect?

推荐答案

您缺少用于配置的 key.converter value.converter 前缀.传递给对流器.因此,代替:

You're missing the key.converter and value.converter prefix, for the config to be passed through to the conveter. So instead of:

key.subject.name.strategy
value.subject.name.strategy

您想要的:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源 https://docs.confluent.io/current/connect/managing/configuring.html :

要将配置参数传递给键和值转换器,请像在定义默认转换器时在工作程序配置中一样为它们添加前缀 key.converter. value.converter...请注意,只有在 key.converter value.converter 属性中指定了相应的转换器配置时,才使用这些属性.

To pass configuration parameters to key and value converters, prefix them with key.converter. or value.converter. as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in the key.converter or value.converter properties.

这篇关于Kafka Connect无法使用主题策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆