在 Scala Spark Dataframe 中展平嵌套的 json [英] Flatten nested json in Scala Spark Dataframe

查看:62
本文介绍了在 Scala Spark Dataframe 中展平嵌套的 json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个来自任何 restapi 的 json,但我不知道它的架构.我无法使用数据框的爆炸功能,因为我不知道由 spark api 创建的列名.

I have multiple jsons coming from any restapi's and I don't know the schema of it. I am unable to use the explode function of dataframes , because i am unaware about the column names, which is getting created by spark api.

1.我们可以通过解码dataframe.schema.fields中的值来存储嵌套数组元素keys的keys,因为spark只提供dataframe行中的value部分并取顶部级别键作为列名.

1.Can we store the keys of the nested arrays elements keys by decoding values from dataframe.schema.fields, As spark only provides the value part in the rows of the dataframe and take the top level key as column name.

数据框 --

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

是否有任何最佳方法可以通过在运行时确定架构来使用数据帧方法来展平 json.

Is there any optimal way to flatten the json by using the dataframe methods via determining the schema at the run time.

示例 Json -:

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

注意 - 我们需要在 dataframe 中完成所有操作,因为有大量数据即将到来,我们无法解析每个 json.

Note - We need to do all the operations in dataframe , because there is a huge amount of data , that is coming and we cannot parse each and every json.

推荐答案

尽量避免展平所有列.

创建辅助函数 &您可以直接在 DataFrame 上调用 df.explodeColumns.

Created helper function & You can directly call df.explodeColumns on DataFrame.

下面的代码将扁平化多级数组&结构类型列.

Below code will flatten multi level array & struct type columns.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers

扁平列

scala> df.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)


scala> df.explodeColumns.printSchema
root
 |-- author: string (nullable = true)
 |-- frameworks_id: long (nullable = true)
 |-- frameworks_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

scala>

这篇关于在 Scala Spark Dataframe 中展平嵌套的 json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆