如何在 Kafka sink JDBC 连接器中转换和提取字段 [英] How to transform and extract fields in Kafka sink JDBC connector

查看:26
本文介绍了如何在 Kafka sink JDBC 连接器中转换和提取字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 3rd 方 CDC 工具,该工具将数据从源数据库复制到 Kafka 主题中.示例行如下所示:

I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}

接收器文件中需要什么配置才能提取dataheaders下的所有(子)字段并忽略beforeData下的那些> 以便 Kafka Sink 传输数据的目标表将包含以下字段:

What configuration is needed in the sink file in order to extract all the (sub)fields under data and headers and ignore those under beforeData so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:

USER_ID, USER_CATEGORY, operation, timestamp

我浏览了confluent 文档中的转换列表无法找到如何使用它们来实现上述目标.

I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.

推荐答案

如果您愿意列出特定的字段名称,可以通过以下方式解决:

If you're willing to list specific field names, you can solve this by:

  1. 使用 Flatten 变换折叠嵌套(这会将原始结构的路径转换为点分隔的名称)
  2. 使用带有 rename 的替换转换使字段名称成为您希望接收器发出的名称
  3. 使用另一个带有 whitelist 的 Replace 转换将发出的字段限制为您选择的字段
  1. Using a Flatten transform to collapse the nesting (which will convert the original structure's paths into dot-delimited names)
  2. Using a Replace transform with rename to make the field names be what you want the sink to emit
  3. Using another Replace transform with whitelist to limit the emitted fields to those you select

对于您的情况,它可能如下所示:

For your case it might look like:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",

这篇关于如何在 Kafka sink JDBC 连接器中转换和提取字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆