在Scala Spark Dataframe中展平嵌套的json [英] Flatten nested json in Scala Spark Dataframe

查看:321
本文介绍了在Scala Spark Dataframe中展平嵌套的json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有来自任何restapi的多个json,但我不知道其架构.我无法使用数据框的爆炸功能,因为我不知道由spark api创建的列名.

I have multiple jsons coming from any restapi's and I don't know the schema of it. I am unable to use the explode function of dataframes , because i am unaware about the column names, which is getting created by spark api.

1.我们能否通过解码dataframe.schema.fields中的值来存储嵌套数组元素键的键,因为spark仅在数据帧的行中提供值部分,而将顶级键作为列名.

1.Can we store the keys of the nested arrays elements keys by decoding values from dataframe.schema.fields, As spark only provides the value part in the rows of the dataframe and take the top level key as column name.

数据框-

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

是否有任何最佳方法可以通过在运行时确定架构来使用dataframe方法来展平json.

Is there any optimal way to flatten the json by using the dataframe methods via determining the schema at the run time.

Json示例-:

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

注意-我们需要在dataframe中进行所有操作,因为有大量数据即将到来,因此我们无法解析每个json.

Note - We need to do all the operations in dataframe , because there is a huge amount of data , that is coming and we cannot parse each and every json.

推荐答案

注意:以下逻辑可能不适用于spark structured streaming.

Note: Below logic might not work for spark structured streaming directly.

创建了辅助功能&您可以直接在DataFrame上调用df.explodeColumns.

Created helper function & You can directly call df.explodeColumns on DataFrame.

以下代码将展平多级数组&结构类型列.

Below code will flatten multi level array & struct type columns.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers

平整列

scala> df.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)


scala> df.explodeColumns.printSchema
root
 |-- author: string (nullable = true)
 |-- frameworks_id: long (nullable = true)
 |-- frameworks_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

scala>

这篇关于在Scala Spark Dataframe中展平嵌套的json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆