如何在 Kafka sink JDBC 连接器中转换和提取字段 [英] How to transform and extract fields in Kafka sink JDBC connector
问题描述
我正在使用 3rd 方 CDC 工具,该工具将数据从源数据库复制到 Kafka 主题中.示例行如下所示:
I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
接收器文件中需要什么配置才能提取data
和headers
下的所有(子)字段并忽略beforeData
下的那些> 以便 Kafka Sink 传输数据的目标表将包含以下字段:
What configuration is needed in the sink file in order to extract all the (sub)fields under data
and headers
and ignore those under beforeData
so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:
USER_ID, USER_CATEGORY, operation, timestamp
我浏览了confluent 文档中的转换列表无法找到如何使用它们来实现上述目标.
I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.
推荐答案
如果您愿意列出特定的字段名称,可以通过以下方式解决:
If you're willing to list specific field names, you can solve this by:
- 使用 Flatten 变换折叠嵌套(这会将原始结构的路径转换为点分隔的名称)
- 使用带有
rename
的替换转换使字段名称成为您希望接收器发出的名称 - 使用另一个带有
whitelist
的 Replace 转换将发出的字段限制为您选择的字段
- Using a Flatten transform to collapse the nesting (which will convert the original structure's paths into dot-delimited names)
- Using a Replace transform with
rename
to make the field names be what you want the sink to emit - Using another Replace transform with
whitelist
to limit the emitted fields to those you select
对于您的情况,它可能如下所示:
For your case it might look like:
"transforms": "t1,t2,t3",
"transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
"transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",
这篇关于如何在 Kafka sink JDBC 连接器中转换和提取字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!