使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段 [英] Get nested fields from Kafka message using Apache Flink SQL

查看:53
本文介绍了使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性.我可以从根属性中提取值,但我不确定如何访问嵌套对象.

I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.

documentation 建议它应该是 MAP 类型,但是当我设置它时,我收到以下错误

The documentation suggests that it should be a MAP type but when I set that, I get the following error

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

这是我的 SQL

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

我的 JSON 看起来像这样:

And my JSON looks something like this:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}

推荐答案

您可以使用 ROW 提取 JSON 消息中的嵌套字段.您的 DDL 语句类似于:

You can use ROW to extract nested fields in your JSON messages. Your DDL statement would look something like:

CREATE TABLE input(
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        );

这篇关于使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆