使用 Spark 和 Scala 展平 json 文件 [英] Flattening a json file using Spark and Scala

查看:38
本文介绍了使用 Spark 和 Scala 展平 json 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个这样的 json 文件:

I have a json file like this:

{    
  "Item Version" : 1.0,    
  "Item Creation Time" : "2019-04-14 14:15:09",        
  "Trade Dictionary" : {    
    "Country" : "India",    
    "TradeNumber" : "1",    
    "action" : {    
      "Action1" : false,    
      "Action2" : true,    
      "Action3" : false    
    },    
    "Value" : "XXXXXXXXXXXXXXX",    
    "TradeRegion" : "Global"    
  },    
  "Prod" : {    
    "Type" : "Driver",    
    "Product Dic" : { },    
    "FX Legs" : [ {    
      "Spot Date" : "2019-04-16",        
      "Value" : true    
    } ]    
  },    
  "Payments" : {    
    "Payment Details" : [ {    
      "Payment Date" : "2019-04-11",    
      "Payment Type" : "Rej"
    } ]
  }
}

我需要以下格式的表格:

I need a table in below format:

Version|Item Creation Time|Country|TradeNumber|Action1|Action2|Action3|Value |TradeRegion|Type|Product Dic|Spot Date |Value|Payment Date|Payment Type |
1 |2019-04-14 14:15 | India| 1 | false| true | false |xxxxxx|Global |Driver|{} |2019-04-16 |True |2019-11-14 |Rej

因此它只会迭代每个键值对,将键作为列名,并将其值放入表值中.

So it will just iterate each key value pair, put the key as column name and it's values to table values.

我当前的代码:

val data2 = data.withColumn("vars",explode(array($"Product")))
  .withColumn("subs", explode($"vars.FX Legs"))
  .select($"vars.*",$"subs.*")

这里的问题是我必须自己提供列名.有什么办法可以让这个更通用吗?

The problem here is that I have to provide the column names myself. Is there any way to make this more generic?

推荐答案

由于数组和结构列在多个级别混合在一起,因此创建通用解决方案并不是那么简单.主要问题是 explode 函数必须在所有数组列上执行,这是一个动作.

Since you have both array and struct columns mixed together in multiple levels it is not that simple to create a general solution. The main problem is that the explode function must be executed on all array column which is an action.

我能想到的最简单的解决方案是使用递归来检查任何结构或数组列.如果有,那么这些将被展平,然后我们再次检查(展平后会有额外的列,可以是数组或结构,因此很复杂).flattenStruct 部分来自这里.

The simplest solution I can come up with uses recursion to check for any struct or array columns. If there are any then those will be flattened and then we check again (after flattening there will be additional columns which can be arrays or structs, hence the complexity). The flattenStruct part is from here.

代码:

def flattenStruct(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)   
    f.dataType match {
      case st: StructType => flattenStruct(st, colName)
      case _ => Array(col(colName))
    }
  })
}

def flattenSchema(df: DataFrame): DataFrame = {
    val structExists = df.schema.fields.filter(_.dataType.typeName == "struct").size > 0
    val arrayCols = df.schema.fields.filter(_.dataType.typeName == "array").map(_.name)

    if(structExists){
        flattenSchema(df.select(flattenStruct(df.schema):_*))
    } else if(arrayCols.size > 0) {
        val newDF = arrayCols.foldLeft(df){
          (tempDf, colName) => tempDf.withColumn(colName, explode(col(colName)))
        }
        flattenSchema(newDF)
    } else {
        df
    }
}

在输入数据帧上运行上述方法:

Running the above method on the input dataframe:

flattenSchema(data)

将给出具有以下架构的数据框:

will give a dataframe with the following schema:

root
 |-- Item Creation Time: string (nullable = true)
 |-- Item Version: double (nullable = true)
 |-- Payment Date: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Spot Date: string (nullable = true)
 |-- Value: boolean (nullable = true)
 |-- Product Dic: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- TradeNumber: string (nullable = true)
 |-- TradeRegion: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Action1: boolean (nullable = true)
 |-- Action2: boolean (nullable = true)
 |-- Action3: boolean (nullable = true)

<小时>

要在新列的名称中保留struct列的前缀,只需在flattenStruct函数中调整最后一个大小写即可:


To keep the prefix of the struct columns in the name of the new columns, you only need to adjust the last case in the flattenStruct function:

case _ => Array(col(colName).as(colName.replace(".", "_")))

这篇关于使用 Spark 和 Scala 展平 json 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆