SMT将通过连接器配置创建Kafka连接器字符串分区键 [英] SMT's to create kafka connector string partition key through connector config

查看:0
本文介绍了SMT将通过连接器配置创建Kafka连接器字符串分区键的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在为PostgreSQL实现一个Kafka连接器(我正在使用debezium Kafka连接器并通过docker运行所有组件)。我需要一个定制分区键,所以我一直使用SMT来实现这一点。但是,我使用的方法创建了一个Struct,并且我需要它是一个字符串。这篇article讲述了如何将分区键设置为int,但我无法访问配置文件来设置适当的转换。目前我的Kafka连接器如下所示

数据-lang="js"数据-隐藏="假"数据-控制台="真"数据-巴贝尔="假">
 curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
    "name": "connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "postgres",
        "database.server.name": "postgres",
        "table.include.list": "public.table",
        "database.history.kafka.bootstrap.servers": "kafka:9092",  
        "database.history.kafka.topic": "schema-changes.table",
        "transforms": "routeRecords,unwrap,createkey",
        "transforms.routeRecords.type":  "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.routeRecords.regex": "(.*)",
        "transforms.routeRecords.replacement": "table",
        "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
        "transforms.createkey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createkey.fields": "id"
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}'

我知道我必须提取列的值,但我只是不确定如何提取。

推荐答案

ValueToKey根据记录的字段列表创建结构。

您还需要一个转换才能从Struct中提取特定的字段,如链接帖子中所示。

org.apache.kafka.connect.transforms.ExtractField$Key

注意:这不设置实际Kafka记录的分区,只设置密钥,然后由生产者散列以获得分区

这篇关于SMT将通过连接器配置创建Kafka连接器字符串分区键的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆