KSQL 流 - 从结构数组中获取数据 [英] KSQL streams - Get data from Array of Struct
本文介绍了KSQL 流 - 从结构数组中获取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我的 JSON 看起来像:
My JSON looks like:
{
"Obj1": {
"a": "abc",
"b": "def",
"c": "ghi"
},
"ArrayObj": [
{
"key1": "1",
"Key2": "2",
"Key3": "3",
},
{
"key1": "4",
"Key2": "5",
"Key3": "6",
},
{
"key1": "7",
"Key2": "8",
"Key3": "9",
}
]
}
我已经编写了 KSQL 流将其转换为 AVRO 并保存到一个主题,以便我可以将其推送到 JDBC Sink 连接器
I have written KSQL streams to convert it to AVRO and save to a topic, So that I can push it to JDBC Sink connector
CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e;
在 Example_Avro 中,我只能获取数组中的第一个对象.
In Example_Avro , I can get only first object in a array.
当我在 KSQL 中点击 select * from Example_Avro 时,如何获得如下所示的数据?
How can I get data shown as below, when I hit select * from Example_Avro in KSQL ?
a b key1 key2 key3
abc def 1 2 3
abc def 4 5 6
abc def 7 8 9
推荐答案
测试数据(我删除了 key3
值后无效的尾随逗号):
Test data (I removed the invalid trailing commas after key3
value):
ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }
查询:
SELECT OBJ1->A AS A,
OBJ1->B AS B,
EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
EXPLODE(ARRAYOBJ)->KEY2 AS KEY2,
EXPLODE(ARRAYOBJ)->KEY3 AS KEY3
FROM TEST4
EMIT CHANGES;
结果:
+-------+-------+------+-------+-------+
|A |B |KEY1 |KEY2 |KEY3 |
+-------+-------+------+-------+-------+
|abc |def |1 |2 |3 |
|abc |def |4 |5 |6 |
|abc |def |7 |8 |9 |
在 ksqlDB 0.6 上测试,其中添加了 EXPLODE
函数.
Tested on ksqlDB 0.6, in which the EXPLODE
function was added.
这篇关于KSQL 流 - 从结构数组中获取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文