使用Apache Flink SQL从Kafka消息获取嵌套字段 [英] Get nested fields from Kafka message using Apache Flink SQL
本文介绍了使用Apache Flink SQL从Kafka消息获取嵌套字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用Apache Flink 1.11创建一个源表,在这里我可以访问JSON消息中的嵌套属性.我可以从根属性中删除值,但不确定如何访问嵌套对象.
I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.
文档建议它应该是 MAP
类型,但是当我设置它时,出现以下错误
The documentation suggests that it should be a MAP
type but when I set that, I get the following error
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
这是我的SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
我的JSON看起来像这样:
And my JSON looks something like this:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
推荐答案
您可以使用 ROW
在JSON消息中提取嵌套字段.您的DDL语句如下所示:
You can use ROW
to extract nested fields in your JSON messages. Your DDL statement would look something like:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
这篇关于使用Apache Flink SQL从Kafka消息获取嵌套字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文