Kafka Connect:如何从 Struct 中获取嵌套字段 [英] Kafka Connect : How to fetch nested fields from Struct

查看:27
本文介绍了Kafka Connect:如何从 Struct 中获取嵌套字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka-Connect 来实现 Kafka-Elasticsearch 连接器.

I am using Kafka-Connect to implement a Kafka-Elasticsearch connector.

生产者向 Kafka 主题发送了一个复杂的 JSON,我的连接器代码将使用它来持久化到弹性搜索.连接器以 Struct 的形式获取数据(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html).

The producer sent a complex JSON on to a Kafka Topic and my connector code will use this to persist to Elastic search. The connector get the data in form of Struct(https://kafka.apache.org/0100/javadoc/org/apache/kafka/connect/data/Struct.html).

我能够在顶级 Json 中获取 struct 的字段值,但无法从嵌套的 json 中获取.

I am able to get the field values of struct at top level Json but not able to fetch from nested jsons.

   {
    "after": {
        "test.test.employee.Value": {
            "id": 5671111,
            "name": {
                "string": "abc"
            }
        }
    },
    "op": "u",
    "ts_ms": {
        "long": 1474892835943
    }
}

我能够解析op",但不能解析test.test.employee.Value".

I am able to parse "op", but not "test.test.employee.Value".

Struct afterStruct = struct.getStruct("after"); // giving me proper value.
String opValue = struct.getString("op"); // giving me proper value of "u". 

Struct valueStruct = afterStruct .getStruct("test.test.employee.Value"); // org.apache.kafka.connect.errors.DataException: test.test.employee.Value is not a valid field name

推荐答案

Struct.getStruct 本身不支持使用点表示法的嵌套.

Struct.getStruct does not natively support nesting using dot notation.

看来您的架构可能来自 Debezium,在这种情况下,他们有自己的解包"消息转换器.

It seems your schema might come from Debezium, in that case, they have their own "unwrap" message transformer.

一种选择,如果您可以控制此提取器代码,您可以找到我为 Confluent Kafka Connect Storage 项目编写的代码很有用.它需要一个 Struct 或 Map 对象(见下文)

One option, if you are in control of this extractor code, you might find the code I wrote for the Confluent Kafka Connect Storage project useful. It takes a Struct or a Map object (see below)

否则,您可能想尝试将 Landoop 的 KCQL 插件 添加到您的 Connect 类路径.

Otherwise, you might want to try adding KCQL plugin by Landoop into your Connect classpath.

  public static Object getNestedFieldValue(Object structOrMap, String fieldName) {
    // validate(structOrMap, fieldName); // can ignore this

    try {
      Object innermost = structOrMap;
      // Iterate down to final struct
      for (String name : fieldName.split("\\.")) {
        innermost = getField(innermost, name);
      }
      return innermost;
    } catch (DataException e) {
      throw new DataException(
            String.format("The field '%s' does not exist in %s.", fieldName, structOrMap),
            e
      );
    }
  }

  public static Object getField(Object structOrMap, String fieldName) {
    // validate(structOrMap, fieldName);

    Object field;
    if (structOrMap instanceof Struct) {
      field = ((Struct) structOrMap).get(fieldName);
    } else if (structOrMap instanceof Map) {
      field = ((Map<?, ?>) structOrMap).get(fieldName);
      if (field == null) {
        throw new DataException(String.format("Unable to find nested field '%s'", fieldName));
      }
      return field;
    } else {
      throw new DataException(String.format(
            "Argument not a Struct or Map. Cannot get field '%s' from %s.",
            fieldName,
            structOrMap
      ));
    }
    if (field == null) {
      throw new DataException(
            String.format("The field '%s' does not exist in %s.", fieldName, structOrMap));
    }
    return field;
  }

这篇关于Kafka Connect:如何从 Struct 中获取嵌套字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆