Kafka Connect无法使用主题策略 [英] Kafka Connect not working with Subject Strategies
问题描述
上下文
我编码了几个小的 Kafka Connect 连接器.一个每秒仅生成随机数据,另一个每秒将其记录在控制台中.它们与架构注册表集成在一起,因此数据是用 Avro 序列化.
I coded a couple of small Kafka Connect connectors. One that just generates random data each second and another that logs it in the console. They're integrated with a Schema Registry so the data is serialized with Avro.
我使用 fast-data-dev Docker映像将它们部署到本地Kafka环境中Landoop
基本设置有效并每秒产生一条消息,记录下来
The basic setup works and produces a message each second that is logged
However, I want to change the subject name strategy. The default one generates two subjects:
-
$ {topic}-键
-
$ {topic} -value
根据我的用例,我将需要生成具有不同模式的事件,这些事件最终会出现在同一主题上.因此,我需要的主题名称是:
As per my use case, I'll need to generate events with different schemas that will end up on the same topic. Therefore, the subject names I need are:
-
$ {topic}-$ {keyRecordName}
-
$ {topic}-$ {valueRecordName}
我创建了 avroData
对象,用于发送值以进行连接:
I create the avroData
object for sending values to connect:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
,然后将其用于创建 SourceRecord
响应对象
and use it afterwards for creating the SourceRecord
response objects
文档指出,为了使用Kafka Connect中的架构注册表我必须在连接器配置中设置一些属性.因此,当我创建它时,请添加它们:
The documentation states that in order to use the Schema Registry in Kafka Connect I have to set some properties in the connector config. Therefore, when I create it I add them:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性,并继续使用旧的 $ {topic} -key
和 $ {topic} -value
主题.
The connector seems to ignore those properties and keeps using the old ${topic}-key
and ${topic}-value
subjects.
问题
Kafka Connect应该支持不同的主题策略.我通过编写自己的
Kafka Connect is supposed to support different subject strategies. I managed to workaround the issue by writing my own version of the AvroConverter
and hardcoding that the subject strategy is the one I need. However, this doesn't look like a good approach and also brought issues when trying to consume the data with the Sink Kafka Connector. I duplicated the subject so there's a version with the old name (${topic}-key
) and it works
将主题策略指定给Kafka Connect的正确设置是什么?
What is the proper setup for specyfing the subject strategy to Kafka Connect?
推荐答案
您缺少用于配置的 key.converter
和 value.converter
前缀.传递给对流器.因此,代替:
You're missing the key.converter
and value.converter
prefix, for the config to be passed through to the conveter. So instead of:
key.subject.name.strategy
value.subject.name.strategy
您想要的:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
来源 https://docs.confluent.io/current/connect/managing/configuring.html :
要将配置参数传递给键和值转换器,请像在定义默认转换器时在工作程序配置中一样为它们添加前缀
key.converter.
或value.converter.
..请注意,只有在key.converter
或value.converter
属性中指定了相应的转换器配置时,才使用这些属性.
To pass configuration parameters to key and value converters, prefix them with
key.converter.
orvalue.converter.
as you would in the worker configuration when defining default converters. Note that these are only used when the corresponding converter configuration is specified in thekey.converter
orvalue.converter
properties.
这篇关于Kafka Connect无法使用主题策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!