使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段 [英] Get nested fields from Kafka message using Apache Flink SQL
本文介绍了使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用 Apache Flink 1.11 创建一个源表,我可以在其中访问 JSON 消息中的嵌套属性.我可以从根属性中提取值,但我不确定如何访问嵌套对象.
I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.
documentation 建议它应该是 MAP
类型,但是当我设置它时,我收到以下错误
The documentation suggests that it should be a MAP
type but when I set that, I get the following error
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
这是我的 SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
我的 JSON 看起来像这样:
And my JSON looks something like this:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
推荐答案
您可以使用 ROW
提取 JSON 消息中的嵌套字段.您的 DDL 语句类似于:
You can use ROW
to extract nested fields in your JSON messages. Your DDL statement would look something like:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
这篇关于使用 Apache Flink SQL 从 Kafka 消息中获取嵌套字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文