如何在Kafka Sink JDBC连接器中转换和提取字段 [英] How to transform and extract fields in Kafka sink JDBC connector

查看:311
本文介绍了如何在Kafka Sink JDBC连接器中转换和提取字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用第三方CDC工具,该工具将数据从源数据库复制到Kafka主题中.示例行如下所示:

I am using a 3rd party CDC tool that replicates data from a source database into Kafka topics. An example row is shown below:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}

在接收器文件中需要什么配置才能提取dataheaders下的所有(子)字段,并忽略beforeData下的那些(子)字段,以便由Kafka在其中传输数据的目标表接收器将包含以下字段:

What configuration is needed in the sink file in order to extract all the (sub)fields under data and headers and ignore those under beforeData so that the target table in which the data will be transferred by Kafka Sink will contain the following fields:

USER_ID, USER_CATEGORY, operation, timestamp

我浏览了融合文档中转换列表,但是我无法找到如何使用它们来实现上述目标.

I went through the transformation list in confluent's docs but I was not able to find how to use them in order to achieve the aforementioned target.

推荐答案

如果您愿意列出特定的字段名称,则可以通过以下方式解决此问题:

If you're willing to list specific field names, you can solve this by:

  1. 使用Flatten变换折叠嵌套(它将原始结构的路径转换为以点分隔的名称)
  2. 使用带有rename的替换变换使字段名称成为您希望接收器发出的名称
  3. 使用另一个whitelist替换变换将发射的字段限制为您选择的字段
  1. Using a Flatten transform to collapse the nesting (which will convert the original structure's paths into dot-delimited names)
  2. Using a Replace transform with rename to make the field names be what you want the sink to emit
  3. Using another Replace transform with whitelist to limit the emitted fields to those you select

对于您的情况,它可能看起来像:

For your case it might look like:

  "transforms": "t1,t2,t3",
  "transforms.t1.type": "org.apache.kafka.connect.transforms.Flatten$Value",
  "transforms.t2.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t2.renames": "data.USER_ID:USER_ID,data.USER_CATEGORY:USER_CATEGORY,headers.operation:operation,headers.timestamp:timestamp",
  "transforms.t3.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.t3.whitelist": "USER_ID,USER_CATEGORY,operation,timestamp",

这篇关于如何在Kafka Sink JDBC连接器中转换和提取字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆