KSQL流-从结构数组获取数据 [英] KSQL streams - Get data from Array of Struct

查看:236
本文介绍了KSQL流-从结构数组获取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的JSON如下:

{
  "Obj1": {
    "a": "abc",
    "b": "def",
    "c": "ghi"
  },
  "ArrayObj": [
    {
      "key1": "1",
      "Key2": "2",
      "Key3": "3",

    },
    {
      "key1": "4",
      "Key2": "5",
      "Key3": "6",

    },
    {
      "key1": "7",
      "Key2": "8",
      "Key3": "9",

    }
  ]

}

我已经编写了KSQL流以将其转换为AVRO并保存到主题,以便我可以将其推送到JDBC Sink连接器

I have written KSQL streams to convert it to AVRO and save to a topic, So that I can push it to JDBC Sink connector

CREATE STREAM Example1(ArrayObj ARRAY<STRUCT<key1 VARCHAR, Key2 VARCHAR>>,Obj1 STRUCT<a VARCHAR>)WITH(kafka_topic='sample_topic', value_format='JSON');
CREATE STREAM Example_Avro WITH(VALUE_FORMAT='avro') AS SELECT e.ArrayObj[0] FROM Example1 e; 

在Example_Avro中,我只能获得数组中的第一个对象。

In Example_Avro , I can get only first object in a array.

当我在KSQL中单击Example_Avro中的select *时,如何获取如下所示的数据?

How can I get data shown as below, when I hit select * from Example_Avro in KSQL ?

  a    b   key1   key2  key3

  abc  def   1       2     3
  abc  def   4       5     6
  abc  def   7       8     9


推荐答案

测试数据(我在 key3 值后删除了无效的尾部逗号) :

Test data (I removed the invalid trailing commas after key3 value):

ksql> PRINT test4;
Format:JSON
1/9/20 7:45:18 PM UTC , NULL , { "Obj1": { "a": "abc", "b": "def", "c": "ghi" }, "ArrayObj": [ { "key1": "1", "Key2": "2", "Key3": "3" }, { "key1": "4", "Key2": "5", "Key3": "6" }, { "key1": "7", "Key2": "8", "Key3": "9" } ] }

查询:

SELECT OBJ1->A AS A, 
       OBJ1->B AS B, 
       EXPLODE(ARRAYOBJ)->KEY1 AS KEY1,
       EXPLODE(ARRAYOBJ)->KEY2 AS KEY2, 
       EXPLODE(ARRAYOBJ)->KEY3 AS KEY3 
FROM   TEST4 
EMIT CHANGES;

结果:

+-------+-------+------+-------+-------+
|A      |B      |KEY1  |KEY2   |KEY3   |
+-------+-------+------+-------+-------+
|abc    |def    |1     |2      |3      |
|abc    |def    |4     |5      |6      |
|abc    |def    |7     |8      |9      |

在ksqlDB 0.6上测试,其中 EXPLODE 功能增加了。

Tested on ksqlDB 0.6, in which the EXPLODE function was added.

这篇关于KSQL流-从结构数组获取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆