展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧 [英] Flatten any nested json string and convert to dataframe using spark scala

查看:32
本文介绍了展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从任何 json 字符串到数据帧创建数据帧.json 字符串通常很深,有时会嵌套.json 字符串是这样的:

I am trying to create dataframe from any json string to dataframe. The json string is generally very deep and nested some times. The json string is like:

val json_string = """{
                   "Total Value": 3,
                   "Topic": "Example",
                   "values": [
                              {
                                "value1": "#example1",
                                "points": [
                                           [
                                           "123",
                                           "156"
                                          ]
                                    ],
                                "properties": {
                                 "date": "12-04-19",
                                 "model": "Model example 1"
                                    }
                                 },
                               {"value2": "#example2",
                                "points": [
                                           [
                                           "124",
                                           "157"
                                          ]
                                    ],
                                "properties": {
                                 "date": "12-05-19",
                                 "model": "Model example 2"
                                    }
                                 }
                              ]
                       }"""

我期待的输出是:

+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
|Total Value| Topic     |values 1 | values.points[0] | values.points[1] | values.properties.date | values.properties.model |
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+
| 3         | Example   | example1 | 123              | 156              | 12-04-19               |  Model Example 1         |
| 3         | Example   | example2 | 124              | 157              | 12-05-19               |    Model example 2         
+-----------+-----------+----------+------------------+------------------+------------------------+-----------------------------+

我正在进行展平,但在 json 中选择了一些键来获取架构然后展平,但我不想以这种方式展平.它应该独立于任何要给出的键并相应地展平,如上面的输出所示.即使在这种情况下给出键值后,由于点是数组,我仍然为相同的记录获得 2 列,因此点 [0] 为一列,点 [1] 为不同的列.我的 Scala 火花代码是:

I am doing flattening but choosing some key in json for getting schema and then flattening but I don't want to flatten in this way. It should be independent of any key to be given and flatten accordingly as shown in output in above. Even after giving key that is values in this case, I am still getting 2 columns for same records due to the points is array so points[0] one columns and points[1] for different columns. My Scala spark code is:

val key = "values" //Ideally this should not be given in my case.
val jsonFullDFSchemaString = spark.read.json(json_location).select(col(key)).schema.json; // changing values to reportData
val jsonFullDFSchemaStructType = DataType.fromJson(jsonFullDFSchemaString).asInstanceOf[StructType]
val df = spark.read.schema(jsonFullDFSchemaStructType).json(json_location);

现在我正在使用扁平化:

Now for flattening I am using:

 def flattenDataframe(df: DataFrame): DataFrame = {
    //getting all the fields from schema
    val fields = df.schema.fields
    val fieldNames = fields.map(x => x.name)
    //length shows the number of fields inside dataframe
    val length = fields.length
    for (i <- 0 to fields.length - 1) {
      val field = fields(i)
      val fieldtype = field.dataType
      val fieldName = field.name
      fieldtype match {
        case arrayType: ArrayType =>
          val fieldName1 = fieldName
          val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName1)
          val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(s"explode_outer($fieldName1) as $fieldName1")
          //val fieldNamesToSelect = (fieldNamesExcludingArray ++ Array(s"$fieldName1.*"))
          val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
          return flattenDataframe(explodedDf)

        case structType: StructType =>
          val childFieldnames = structType.fieldNames.map(childname => fieldName + "." + childname)
          val newfieldNames = fieldNames.filter(_ != fieldName) ++ childFieldnames
          val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".", "_").replace("$", "_").replace("__", "_").replace(" ", "").replace("-", ""))))
          val explodedf = df.select(renamedcols: _*)
          return flattenDataframe(explodedf)
        case _ =>
      }
    }
    df
  }

现在终于从 json 中得到扁平化的数据框:

Now finally getting flatten dataframe from json:

val tableSchemaDF = flattenDataframe(df)
println(tableSchemaDF)

所以理想情况下,任何 json 文件都应该相应地变平,如我上面所示,无需提供任何根键,也无需创建 2 行.希望我已经提供了足够的细节.任何帮助将不胜感激.谢谢.

So ideally any json file should get flatten accordingly as I shown above without giving any root key and without creating 2 rows. Hope I have given enough details. Any help will be appreciated. Thanks.

请注意:Json 数据来自 API,因此不确定根键值"是否存在.这就是为什么我不打算提供扁平化的关键.

Please Note: The Json Data is coming from API so it's not certain that the root key 'values' will be there or not. That's why I am not going with giving key for flattening.

推荐答案

这是一个基于递归的解决方案,由于您有特殊性,所以最后有点hacky":

Here's a solution based on recursion, just a bit "hacky" at the end since you have specificities :

@tailrec
def recurs(df: DataFrame): DataFrame = {
  if(df.schema.fields.find(_.dataType match {
    case ArrayType(StructType(_),_) | StructType(_) => true
    case _ => false
  }).isEmpty) df
  else {
    val columns = df.schema.fields.map(f => f.dataType match {
      case _: ArrayType => explode(col(f.name)).as(f.name)
      case s: StructType => col(s"${f.name}.*")
      case _ => col(f.name)
    })
    recurs(df.select(columns:_*))
  }
}

val recursedDF = recurs(df)
val valuesColumns = recursedDF.columns.filter(_.startsWith("value"))
val projectionDF = recursedDF.withColumn("values", coalesce(valuesColumns.map(col):_*))
  .withColumn("point[0]", $"points".getItem(0))
  .withColumn("point[1]", $"points".getItem(1))
    .drop(valuesColumns :+ "points":_*)
projectionDF.show(false)

输出:

+-------+-----------+--------+---------------+---------+--------+--------+
|Topic  |Total Value|date    |model          |values   |point[0]|point[1]|
+-------+-----------+--------+---------------+---------+--------+--------+
|Example|3          |12-04-19|Model example 1|#example1|123     |156     |
|Example|3          |12-05-19|Model example 2|#example2|124     |157     |
+-------+-----------+--------+---------------+---------+--------+--------+

这篇关于展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆