如何在 Scala 中使用动态键解析动态 Json [英] How to parse dynamic Json with dynamic keys inside it in Scala

查看:28
本文介绍了如何在 Scala 中使用动态键解析动态 Json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解析本质上是动态的 Json 结构并加载到数据库中.但是在 json 内部有动态键的情况下面临困难.下面是我的示例 json:尝试使用爆炸功能但没有帮助.这里描述了类似的东西

解决方案

这里是我使用 Vertica 所做的:

我创建了一个弹性表,加载它,并使用 Vertica 的弹性表函数 COMPUTE_FLEXTABLE_KEYS_AND_CREATE_VIEW() 来获取视图.

原来是单行表:

-- 创建弹性表创建弹性表 demovals();-- 使用内置的 JSON 解析器复制它(它创建一个地图容器,-- 所有键值对从'/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser() 复制演示;-- out vsql:/home/gessnerm/._vfv.sql:1: ROLLBACK 4213: Object demovals"已经存在-- 已加载的行-  出去  -  -  -  -  -  - --- 出 1-- 输出(1 行)-  出去-- 输出时间:第一次提取(1 行):112.540 毫秒.格式化的所有行:112.623 毫秒-- 下一行的函数猜测值中的数据类型-- 匹配键,将猜测的数据类型存储在第二个表中,-- 并从所有找到的键构建视图SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('demovals');-- 出 COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW-  出去  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - ------------------------------------------------------------ out 请参阅 dbadmin.demovals_keys 以获取更新的密钥-- out 视图 dbadmin.demovals_view 已准备好查询-- 输出(1 行)-  出去-- 输出时间:第一次提取(1 行):467.551 毫秒.格式化的所有行:467.583 毫秒-- 现在,从 flex 表的单行视图中选择,-- 报告中每列一行(扩展视图:\x")\XSELECT * FROM dbadmin.demovals_view;-- 输出 -[ 记录 1 ]---------------+------------------------------------- 出_id.channelid |G7k5_-HWRIuF0-afe7q-rQ-- 出_id.line |b443e9c0-fafc-4791-87c9-8e32339c7f3c-- 出 _id.period.enddate |20200927-- 出 _id.period.name |2020 年第三季度-- 出 _id.period.startdate |20200629-- 出_id.planid |5f34dab0c661d8337097afb9-- 出 _id.version.$numberlong |1-- 输出 demovalues.21.cpm |0.00-- 输出 demovalues.21.cpp |0-- out demovalues.21.demoid |21-- 输出 demovalues.21.grps |0.00-- 出 demovalues.21.imps |0.00-- out demovalues.21.rcimps |0.00-- 出 demovalues.21.ue |0.00-- out demovalues.21.vpvh |0.00-- 输出 demovalues.63.cpm |0.00-- out demovalues.63.cpp |0-- out demovalues.63.demoid |63-- 输出 demovalues.63.grps |0.00-- out demovalues.63.imps |0.00-- out demovalues.63.rcimps |0.00-- 退出 demovalues.63.ue |0.00-- out demovalues.63.vpvh |0.00-- out demovalues.66.cpm |0.00-- out demovalues.66.cpp |0-- out demovalues.66.demoid |66-- 出 demovalues.66.grps |0.00-- out demovalues.66.imps |0.00-- out demovalues.66.rcimps |0.00-- 退出 demovalues.66.ue |0.00-- out demovalues.66.vpvh |0.00-- out hh-imps |0.00-- out unitrates.rate |0.00-- out unitrates.rcrate |0.00-- out unitrates.units.$numberlong |0

以孩子为例:

CREATE FLEX TABLE children();截断表儿童;从'/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser(start_point='demoValues') 复制孩子;SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('儿童');\XSELECT * FROM dbadmin.children_view;-- 输出时间:第一次提取(0 行):7.303 毫秒.格式化的所有行:7.308 毫秒-- 已加载的行-  出去  -  -  -  -  -  - --- 出 1-- 输出(1 行)-  出去-- 输出时间:第一次提取(1 行):13.848 毫秒.格式化的所有行:13.876 毫秒-- 出 COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW-  出去  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  - ------------------------------------------------------------ out 请参阅 dbadmin.children_keys 以获取更新的密钥-- out 视图 dbadmin.children_view 已准备好查询-- 输出(1 行)-  出去-- 输出时间:第一次提取(1 行):140.381 毫秒.格式化的所有行:140.404 毫秒-- 输出 -[ 记录 1 ]----- 输出 21.cpm |0.00-- 出 21.cpp |0-- 出 21.demoid |21-- 出 21.grps |0.00-- 出 21.imps |0.00-- 出 21.rcimps |0.00-- 出 21.ue |0.00-- 出 21.vpvh |0.00-- 输出 63.cpm |0.00-- 出 63.cpp |0-- 出 63.demoid |63-- 出 63.grps |0.00-- 出 63.imps |0.00-- 出 63.rcimps |0.00-- 出 63.ue |0.00-- 出 63.vpvh |0.00-- 输出 66.cpm |0.00-- 出 66.cpp |0-- 出 66.demoid |66-- 出 66.grps |0.00-- 出 66.imps |0.00-- 出 66.rcimps |0.00-- 出 66.ue |0.00-- 出 66.vpvh |0.00

I am trying to parse Json structure which is dynamic in nature and load into database. But facing difficulty where json has dynamic keys inside it. Below is my sample json: Have tried using explode function but didn't help. moslty similar thing is described here How to parse a dynamic JSON key in a Nested JSON result?

     {
    "_id": {
        "planId": "5f34dab0c661d8337097afb9",
        "version": {
            "$numberLong": "1"
        },
        "period": {
            "name"
            : "3Q20",
            "startDate": 20200629,
            "endDate": 20200927
        },
        "line": "b443e9c0-fafc-4791-87c9-
        8e32339c7f3c",
        "channelId": "G7k5_-HWRIuF0-afe7q-rQ"
    },
    "unitRates": {
        "units": {
            "$numberLong":
            "0"
        },
        "rate": 0.0,
        "rcRate": 0.0
    },
    "demoValues": {
        "66": {
            "cpm": 0.0,
            "cpp": 0,
            "vpvh": 0.0,
            "imps"
            :
            0.0,
            "rcImps": 0.0,
            "ue": 0.0,
            "grps": 0.0,
            "demoId": "66"
        },
        "63": {
            "cpm": 0.0,
            "cpp": 0,
            "vpvh":
            0.0,
            "imps": 0.0,
            "rcImps": 0.0,
            "ue": 0.0,
            "grps": 0.0,
            "demoId": "63"
        },
        "21": {
            "cpm": 0.0,
            "cpp"
            :
            0,
            "vpvh": 0.0,
            "imps": 0.0,
            "rcImps": 0.0,
            "ue": 0.0,
            "grps": 0.0,
            "demoId": "21"
        }
    },
    "hh-imps":
    0.0
}

Below is my scala code:

      import org.apache.spark.sql.streaming.OutputMode
      import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
      import com.google.gson.JsonObject
      import org.apache.spark.sql.types.{ArrayType, MapType, StringType, 
      StructField, StructType}
      import org.codehaus.jettison.json.JSONObject

object ParseDynamic_v2 {
  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "C:\\hadoop")
    val spark = SparkSession
      .builder
      .appName("ConfluentConsumer")
      .master("local[4]")
      .getOrCreate()

    import spark.implicits._
    val jsonStringDs = spark.createDataset[String](
      Seq(
        ("""{"_id" : {"planId" : "5f34dab0c661d8337097afb9","version" : {"$numberLong" : "1"},"period" : {"name" : "3Q20","startDate" : 20200629,"endDate" : 20200927},"line" : "b443e9c0-fafc-4791-87c9-8e32339c7f3c","channelId" : "G7k5_-HWRIuF0-afe7q-rQ"},"unitRates" : {"units" : {"$numberLong" : "0"},"rate" : 0.0,"rcRate" : 0.0},"demoValues" : {"66" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "66"},"63" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "63"},"21" : {"cpm" : 0.0,"cpp" : 0,"vpvh" : 0.0,"imps" : 0.0,"rcImps" : 0.0,"ue" : 0.0,"grps" : 0.0,"demoId" : "21"}},"hh-imps" : 0.0}""")

      ))

    jsonStringDs.show
    import spark.implicits._
    val df = spark.read.json(jsonStringDs)
    df.show(false)


    val app = df.select("demoValues.*")
    app.createOrReplaceTempView("app")
    app.printSchema
    app.show(false)


    val verticaProperties: Map[String, String] = Map(
      "db" -> "dbname", // Database name
     "user" -> "user", // Database username
     "password" -> "****", // Password
     "table" -> "tablename", // vertica table name
     "dbschema" -> "public", // schema of vertica where the table will be 
     residing
     "host" -> "localhost", // Host on which vertica is currently running
     "hdfs_url" -> "hdfs://localhost:8020/user/hadoop/planheader/", // HDFS directory url in which intermediate orc file will persist before sending it to vertica
     "web_hdfs_url" -> "webhdfs://localhost:50070/user/hadoop/planheader/"
    )

    val verticaDataSource = "com.vertica.spark.datasource.DefaultSource"
    //read mode
    val loadStream = df.write.format(verticaDataSource).options(verticaProperties).mode("overwrite").save()

    //read stream mode

    val saveToVertica: DataFrame => Unit =
      dataFrame =>
        dataFrame.write.format(verticaDataSource).options(verticaProperties).mode("append").save()

    val checkpointLocation = "/user/hadoop/planheader/checkpoint"
    val streamingQuery = df.writeStream
      .outputMode(OutputMode.Append)
      .option("checkpointLocation", checkpointLocation)
      //.trigger(ProcessingTime("25 seconds"))
      .foreachBatch((ds, _) => saveToVertica(ds)).start()

    streamingQuery.awaitTermination()


  }

}


    

expected output:

解决方案

Here you see what I did using Vertica:

I created a flex table, loaded it, and used Vertica's flex table function COMPUTE_FLEXTABLE_KEYS_AND_CREATE_VIEW() to get a view.

Turned out to be a single-row table:

-- CREATE the Flex Table
CREATE FLEX TABLE demovals();

-- copy it using the built-in JSON Parser (it creates a map container,
-- with all key-value pairs
COPY demovals FROM '/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser();
-- out vsql:/home/gessnerm/._vfv.sql:1: ROLLBACK 4213:  Object "demovals" already exists
-- out  Rows Loaded 
-- out -------------
-- out            1
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 112.540 ms. All rows formatted: 112.623 ms
-- the function on the next line guesses the data types in the values
-- matching the keys, stores the guessed data types in a second table,
-- and builds a view from all found keys
SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('demovals');
-- out                                  COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW                                  
-- out --------------------------------------------------------------------------------------------------------
-- out  Please see dbadmin.demovals_keys for updated keys
-- out The view dbadmin.demovals_view is ready for querying
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 467.551 ms. All rows formatted: 467.583 ms
-- now, select from the single-row view on the flex table, 
-- one row per column in the report (extended view: "\x" )
\x
SELECT * FROM dbadmin.demovals_view;
-- out -[ RECORD 1 ]---------------+-------------------------------------
-- out _id.channelid               | G7k5_-HWRIuF0-afe7q-rQ
-- out _id.line                    | b443e9c0-fafc-4791-87c9-8e32339c7f3c
-- out _id.period.enddate          | 20200927
-- out _id.period.name             | 3Q20
-- out _id.period.startdate        | 20200629
-- out _id.planid                  | 5f34dab0c661d8337097afb9
-- out _id.version.$numberlong     | 1
-- out demovalues.21.cpm           | 0.00
-- out demovalues.21.cpp           | 0
-- out demovalues.21.demoid        | 21
-- out demovalues.21.grps          | 0.00
-- out demovalues.21.imps          | 0.00
-- out demovalues.21.rcimps        | 0.00
-- out demovalues.21.ue            | 0.00
-- out demovalues.21.vpvh          | 0.00
-- out demovalues.63.cpm           | 0.00
-- out demovalues.63.cpp           | 0
-- out demovalues.63.demoid        | 63
-- out demovalues.63.grps          | 0.00
-- out demovalues.63.imps          | 0.00
-- out demovalues.63.rcimps        | 0.00
-- out demovalues.63.ue            | 0.00
-- out demovalues.63.vpvh          | 0.00
-- out demovalues.66.cpm           | 0.00
-- out demovalues.66.cpp           | 0
-- out demovalues.66.demoid        | 66
-- out demovalues.66.grps          | 0.00
-- out demovalues.66.imps          | 0.00
-- out demovalues.66.rcimps        | 0.00
-- out demovalues.66.ue            | 0.00
-- out demovalues.66.vpvh          | 0.00
-- out hh-imps                     | 0.00
-- out unitrates.rate              | 0.00
-- out unitrates.rcrate            | 0.00
-- out unitrates.units.$numberlong | 0

For the children, for example:

CREATE FLEX TABLE children();
TRUNCATE TABLE children;
COPY children FROM '/home/gessnerm/1/Vertica/supp/l.json' PARSER fjsonparser(start_point='demoValues');
SELECT COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW('children');
\x
SELECT * FROM dbadmin.children_view;
-- out Time: First fetch (0 rows): 7.303 ms. All rows formatted: 7.308 ms
-- out  Rows Loaded 
-- out -------------
-- out            1
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 13.848 ms. All rows formatted: 13.876 ms
-- out                                  COMPUTE_FLEXTABLE_KEYS_AND_BUILD_VIEW                                  
-- out --------------------------------------------------------------------------------------------------------
-- out  Please see dbadmin.children_keys for updated keys
-- out The view dbadmin.children_view is ready for querying
-- out (1 row)
-- out 
-- out Time: First fetch (1 row): 140.381 ms. All rows formatted: 140.404 ms
-- out -[ RECORD 1 ]---
-- out 21.cpm    | 0.00
-- out 21.cpp    | 0
-- out 21.demoid | 21
-- out 21.grps   | 0.00
-- out 21.imps   | 0.00
-- out 21.rcimps | 0.00
-- out 21.ue     | 0.00
-- out 21.vpvh   | 0.00
-- out 63.cpm    | 0.00
-- out 63.cpp    | 0
-- out 63.demoid | 63
-- out 63.grps   | 0.00
-- out 63.imps   | 0.00
-- out 63.rcimps | 0.00
-- out 63.ue     | 0.00
-- out 63.vpvh   | 0.00
-- out 66.cpm    | 0.00
-- out 66.cpp    | 0
-- out 66.demoid | 66
-- out 66.grps   | 0.00
-- out 66.imps   | 0.00
-- out 66.rcimps | 0.00
-- out 66.ue     | 0.00
-- out 66.vpvh   | 0.00

这篇关于如何在 Scala 中使用动态键解析动态 Json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆