Kafka Connect SMT 添加 Kafka 标头字段 [英] Kafka Connect SMT to add Kafka header fields

查看:23
本文介绍了Kafka Connect SMT 添加 Kafka 标头字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要找到或编写一个将向请求添加标头字段的 SMT.请求缺少一些类型字段,我想添加它们.

I need to find or write an SMT that will add header fields to a request. The request is missing some type fields and I want to add them.

您究竟如何在 SMT 中添加标题,我所看到的只是如下所示的记录转换,但如果我想更改标题或向其中添加字段怎么办?

How exactly do you add a header within an SMT all I have seen are just record transforms like below but what if its the header I want to change or add a field to?

   private R applySchemaless(R record) {

   final Map<String, Object> value = requireMap(operatingValue(record), PURPOSE);
  // record.headers.add(Header)  but how do I define the header
  // or record.headers.add(String, Schema) but I am not sure how to define Schema? 
  final Map<String, Object> updatedValue = new HashMap<>(value);

  updatedValue.put(fieldName, getRandomUuid());
  

  return newRecord(record, null, updatedValue);
  
}

推荐答案

这应该可行

Headers headers = new ConnectHeaders();
headers.add(myKey, myValue, mySchema);
headers.forEach(h -> record.headers().add(h));

ConnectHeaders 信息可以在这里找到 - https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/header/Headers.html

ConnectHeaders info can be found here - https://kafka.apache.org/25/javadoc/org/apache/kafka/connect/header/Headers.html

这篇关于Kafka Connect SMT 添加 Kafka 标头字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆