展平任何嵌套的json字符串,并使用spark scala转换为dataframe [英] Flatten any nested json string and convert to dataframe using spark scala

查看:123
本文介绍了展平任何嵌套的json字符串,并使用spark scala转换为dataframe的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从任何json字符串到dataframe创建dataframe. json字符串通常很深,并且嵌套了一些时间. json字符串类似于:

I am trying to create dataframe from any json string to dataframe. The json string is generally very deep and nested some times. The json string is like:

val json_string = """{
                   "Total Value": 3,
                   "Topic": "Example",
                   "values": [
                              {
                                "value1": "#example1",
                                "points": [
                                           [
                                           "123",
                                           "156"
                                          ]
                                    ],
                                "properties": {
                                 "date": "12-04-19",
                                 "model": "Model example 1"
                                    }
                                 },
                               {"value2": "#example2",
                                "points": [
                                           [
                                           "124",
                                           "157"
                                          ]
                                    ],
                                "properties": {
                                 "date": "12-05-19",
                                 "model": "Model example 2"
                                    }
                                 }
                              ]
                       }"""

我期望的输出是:

+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
|Total Value| Topic     |values 1 | values.points[0] | values.points[1] | values.properties.date | values.properties.model |
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
| 3         | Example   | example1 | 123              | 156              | 12-04-19               |  Model Example 1         |
| 3         | Example   | example2 | 124              | 157              | 12-05-19               |    Model example 2         
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+

我正在进行展平,但是在json中选择一些键来获取架构,然后进行展平,但是我不想以这种方式进行展平.它应该独立于要给出的任何键,并相应地展平,如上面的输出所示. 即使在给出键(在这种情况下为值)之后,由于points是数组,所以我仍然为相同的记录得到2列,所以points [0]是一列,而points [1]是不同的列.我的Scala火花代码是:

I am doing flattening but choosing some key in json for getting schema and then flattening but I don't want to flatten in this way. It should be independent of any key to be given and flatten accordingly as shown in output in above. Even after giving key that is values in this case, I am still getting 2 columns for same records due to the points is array so points[0] one columns and points[1] for different columns. My Scala spark code is:

val key = "values" //Ideally this should not be given in my case.
val jsonFullDFSchemaString = spark.read.json(json_location).select(col(key)).schema.json; // changing values to reportData
val jsonFullDFSchemaStructType = DataType.fromJson(jsonFullDFSchemaString).asInstanceOf[StructType]
val df = spark.read.schema(jsonFullDFSchemaStructType).json(json_location);

我现在正在扁平化:

 def flattenDataframe(df: DataFrame): DataFrame = {
    //getting all the fields from schema
    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    //length shows the number of fields inside dataframe
    val length = fields.length
    for (i <- 0 to fields.length - 1) {
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldName1 = fieldName
          val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
          //val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
          val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
          return flattenDataframe(explodedDf)

        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
          val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
          val explodedf = df.select(renamedcols: _*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
  }

现在终于从json中获取扁平化的数据帧了:

Now finally getting flatten dataframe from json:

val tableSchemaDF = flattenDataframe(df)
println(tableSchemaDF)

因此,理想情况下,如我上面所示,任何json文件都应相应地变平,而无需提供任何根密钥,也无需创建2行.希望我能提供足够的细节.任何帮助将不胜感激.谢谢.

So ideally any json file should get flatten accordingly as I shown above without giving any root key and without creating 2 rows. Hope I have given enough details. Any help will be appreciated. Thanks.

请注意:Json数据来自API,因此不确定根键值"是否存在.这就是为什么我不打算给出展平的密钥的原因.

Please Note: The Json Data is coming from API so it's not certain that the root key 'values' will be there or not. That's why I am not going with giving key for flattening.

推荐答案

这是一个基于递归的解决方案,由于您具有特殊性,最后一点"hacky":

Here's a solution based on recursion, just a bit "hacky" at the end since you have specificities :

@tailrec
def recurs(df: DataFrame): DataFrame = {
  if(df.schema.fields.find(_.dataType match {
    case ArrayType(StructType(_),_) | StructType(_) => true
    case _ => false
  }).isEmpty) df
  else {
    val columns = df.schema.fields.map(f => f.dataType match {
      case _: ArrayType => explode(col(f.name)).as(f.name)
      case s: StructType => col(s"${f.name}.*")
      case _ => col(f.name)
    })
    recurs(df.select(columns:_*))
  }
}

val recursedDF = recurs(df)
val valuesColumns = recursedDF.columns.filter(_.startsWith("value"))
val projectionDF = recursedDF.withColumn("values", coalesce(valuesColumns.map(col):_*))
  .withColumn("point[0]", $"points".getItem(0))
  .withColumn("point[1]", $"points".getItem(1))
    .drop(valuesColumns :+ "points":_*)
projectionDF.show(false)

输出:

+-------+-----------+--------+---------------+---------+--------+--------+
|Topic  |Total Value|date    |model          |values   |point[0]|point[1]|
+-------+-----------+--------+---------------+---------+--------+--------+
|Example|3          |12-04-19|Model example 1|#example1|123     |156     |
|Example|3          |12-05-19|Model example 2|#example2|124     |157     |
+-------+-----------+--------+---------------+---------+--------+--------+

这篇关于展平任何嵌套的json字符串,并使用spark scala转换为dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆