KSQL 流 - 从结构数组中获取数据 [英] KSQL streams - Get data from Array of Struct

查看:26
本文介绍了KSQL 流 - 从结构数组中获取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 JSON 看起来像:

My JSON looks like:

{
  "Obj1": {
    "a": "abc",
    "b": "def",
    "c": "ghi"
  },
  "ArrayObj": [
    {
      "key1": "1",
      "Key2": "2",
      "Key3": "3",

    },
    {
      "key1": "4",
      "Key2": "5",
      "Key3": "6",

    },
    {
      "key1": "7",
      "Key2": "8",
      "Key3": "9",

    }
  ]

}

我已经编写了 KSQL 流将其转换为 AVRO 并保存到一个主题,以便我可以将其推送到 JDBC Sink 连接器

I have written KSQL streams to convert it to AVRO and save to a topic, So that I can push it to JDBC Sink connector

CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e; 

在 Example_Avro 中,我只能获取数组中的第一个对象.

In Example_Avro , I can get only first object in a array.

当我在 KSQL 中点击 select * from Example_Avro 时,如何获得如下所示的数据?

How can I get data shown as below, when I hit select * from Example_Avro in KSQL ?

  a    b   key1   key2  key3

  abc  def   1       2     3
  abc  def   4       5     6
  abc  def   7       8     9

推荐答案

测试数据(我删除了 key3 值后无效的尾随逗号):

Test data (I removed the invalid trailing commas after key3 value):

ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }

查询:

SELECT OBJ1->A AS A, 
       OBJ1->B AS B, 
       EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
       EXPLODE(ARRAYOBJ)->KEY2 AS KEY2, 
       EXPLODE(ARRAYOBJ)->KEY3 AS KEY3 
FROM   TEST4 
EMIT CHANGES;

结果:

+-------+-------+------+-------+-------+
|A      |B      |KEY1  |KEY2   |KEY3   |
+-------+-------+------+-------+-------+
|abc    |def    |1     |2      |3      |
|abc    |def    |4     |5      |6      |
|abc    |def    |7     |8      |9      |

在 ksqlDB 0.6 上测试,其中添加了 EXPLODE 函数.

Tested on ksqlDB 0.6, in which the EXPLODE function was added.

这篇关于KSQL 流 - 从结构数组中获取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆