使用Spark和Scala展平json文件 [英] Flattening a json file using Spark and Scala

查看:215
本文介绍了使用Spark和Scala展平json文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个像这样的json文件:

I have a json file like this:

{    
  "Item Version" : 1.0,    
  "Item Creation Time" : "2019-04-14 14:15:09",        
  "Trade Dictionary" : {    
    "Country" : "India",    
    "TradeNumber" : "1",    
    "action" : {    
      "Action1" : false,    
      "Action2" : true,    
      "Action3" : false    
    },    
    "Value" : "XXXXXXXXXXXXXXX",    
    "TradeRegion" : "Global"    
  },    
  "Prod" : {    
    "Type" : "Driver",    
    "Product Dic" : { },    
    "FX Legs" : [ {    
      "Spot Date" : "2019-04-16",        
      "Value" : true    
    } ]    
  },    
  "Payments" : {    
    "Payment Details" : [ {    
      "Payment Date" : "2019-04-11",    
      "Payment Type" : "Rej"
    } ]
  }
}

我需要使用以下格式的表:

I need a table in below format:

Version|Item Creation Time|Country|TradeNumber|Action1|Action2|Action3|Value |TradeRegion|Type|Product Dic|Spot Date |Value|Payment Date|Payment Type |
1 |2019-04-14 14:15 | India| 1 | false| true | false |xxxxxx|Global |Driver|{} |2019-04-16 |True |2019-11-14 |Rej

因此,它将仅迭代每个键值对,将键作为列名并将其值添加到表值。

So it will just iterate each key value pair, put the key as column name and it's values to table values.

我当前的代码:

val data2 = data.withColumn("vars",explode(array($"Product")))
  .withColumn("subs", explode($"vars.FX Legs"))
  .select($"vars.*",$"subs.*")

这里的问题是我必须自己提供列名。有什么方法可以使它更通用?

The problem here is that I have to provide the column names myself. Is there any way to make this more generic?

推荐答案

由于数组列和结构列在多个级别上混合在一起,因此创建通用解决方案并不那么简单。主要问题是 explode 函数必须在所有数组列上执行,这是一个动作。

Since you have both array and struct columns mixed together in multiple levels it is not that simple to create a general solution. The main problem is that the explode function must be executed on all array column which is an action.

我能想到的最简单的解决方案是使用递归检查任何结构或数组列。如果有,则将其展平,然后我们再次检查(展平后将有其他列,这些列可以是数组或结构,因此很复杂)。 flattenStruct 部分来自此处

The simplest solution I can come up with uses recursion to check for any struct or array columns. If there are any then those will be flattened and then we check again (after flattening there will be additional columns which can be arrays or structs, hence the complexity). The flattenStruct part is from here.

代码:

def flattenStruct(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)   
    f.dataType match {
      case st: StructType => flattenStruct(st, colName)
      case _ => Array(col(colName))
    }
  })
}

def flattenSchema(df: DataFrame): DataFrame = {
    val structExists = df.schema.fields.filter(_.dataType.typeName == "struct").size > 0
    val arrayCols = df.schema.fields.filter(_.dataType.typeName == "array").map(_.name)

    if(structExists){
        flattenSchema(df.select(flattenStruct(df.schema):_*))
    } else if(arrayCols.size > 0) {
        val newDF = arrayCols.foldLeft(df){
          (tempDf, colName) => tempDf.withColumn(colName, explode(col(colName)))
        }
        flattenSchema(newDF)
    } else {
        df
    }
}

在输入数据帧上运行上述方法:

Running the above method on the input dataframe:

flattenSchema(data)

将给出一个带有以下模式:

will give a dataframe with the following schema:

root
 |-- Item Creation Time: string (nullable = true)
 |-- Item Version: double (nullable = true)
 |-- Payment Date: string (nullable = true)
 |-- Payment Type: string (nullable = true)
 |-- Spot Date: string (nullable = true)
 |-- Value: boolean (nullable = true)
 |-- Product Dic: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- TradeNumber: string (nullable = true)
 |-- TradeRegion: string (nullable = true)
 |-- Value: string (nullable = true)
 |-- Action1: boolean (nullable = true)
 |-- Action2: boolean (nullable = true)
 |-- Action3: boolean (nullable = true)






要在新列的名称中保留struct列的前缀,只需调整<$中的最后一种情况c $ c> flattenStruct 函数:

case _ => Array(col(colName).as(colName.replace(".", "_")))

这篇关于使用Spark和Scala展平json文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆