使用 Spark 和 Scala 展平 json 文件 [英] Flattening a json file using Spark and Scala
问题描述
我有一个这样的 json 文件:
I have a json file like this:
{
"Item Version" : 1.0,
"Item Creation Time" : "2019-04-14 14:15:09",
"Trade Dictionary" : {
"Country" : "India",
"TradeNumber" : "1",
"action" : {
"Action1" : false,
"Action2" : true,
"Action3" : false
},
"Value" : "XXXXXXXXXXXXXXX",
"TradeRegion" : "Global"
},
"Prod" : {
"Type" : "Driver",
"Product Dic" : { },
"FX Legs" : [ {
"Spot Date" : "2019-04-16",
"Value" : true
} ]
},
"Payments" : {
"Payment Details" : [ {
"Payment Date" : "2019-04-11",
"Payment Type" : "Rej"
} ]
}
}
我需要以下格式的表格:
I need a table in below format:
Version|Item Creation Time|Country|TradeNumber|Action1|Action2|Action3|Value |TradeRegion|Type|Product Dic|Spot Date |Value|Payment Date|Payment Type |
1 |2019-04-14 14:15 | India| 1 | false| true | false |xxxxxx|Global |Driver|{} |2019-04-16 |True |2019-11-14 |Rej
因此它只会迭代每个键值对,将键作为列名,并将其值放入表值中.
So it will just iterate each key value pair, put the key as column name and it's values to table values.
我当前的代码:
val data2 = data.withColumn("vars",explode(array($"Product")))
.withColumn("subs", explode($"vars.FX Legs"))
.select($"vars.*",$"subs.*")
这里的问题是我必须自己提供列名.有什么办法可以让这个更通用吗?
The problem here is that I have to provide the column names myself. Is there any way to make this more generic?
推荐答案
由于数组和结构列在多个级别混合在一起,因此创建通用解决方案并不是那么简单.主要问题是 explode
函数必须在所有数组列上执行,这是一个动作.
Since you have both array and struct columns mixed together in multiple levels it is not that simple to create a general solution. The main problem is that the explode
function must be executed on all array column which is an action.
我能想到的最简单的解决方案是使用递归来检查任何结构或数组列.如果有,那么这些将被展平,然后我们再次检查(展平后会有额外的列,可以是数组或结构,因此很复杂).flattenStruct
部分来自这里.
The simplest solution I can come up with uses recursion to check for any struct or array columns. If there are any then those will be flattened and then we check again (after flattening there will be additional columns which can be arrays or structs, hence the complexity). The flattenStruct
part is from here.
代码:
def flattenStruct(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenStruct(st, colName)
case _ => Array(col(colName))
}
})
}
def flattenSchema(df: DataFrame): DataFrame = {
val structExists = df.schema.fields.filter(_.dataType.typeName == "struct").size > 0
val arrayCols = df.schema.fields.filter(_.dataType.typeName == "array").map(_.name)
if(structExists){
flattenSchema(df.select(flattenStruct(df.schema):_*))
} else if(arrayCols.size > 0) {
val newDF = arrayCols.foldLeft(df){
(tempDf, colName) => tempDf.withColumn(colName, explode(col(colName)))
}
flattenSchema(newDF)
} else {
df
}
}
在输入数据帧上运行上述方法:
Running the above method on the input dataframe:
flattenSchema(data)
将给出具有以下架构的数据框:
will give a dataframe with the following schema:
root
|-- Item Creation Time: string (nullable = true)
|-- Item Version: double (nullable = true)
|-- Payment Date: string (nullable = true)
|-- Payment Type: string (nullable = true)
|-- Spot Date: string (nullable = true)
|-- Value: boolean (nullable = true)
|-- Product Dic: string (nullable = true)
|-- Type: string (nullable = true)
|-- Country: string (nullable = true)
|-- TradeNumber: string (nullable = true)
|-- TradeRegion: string (nullable = true)
|-- Value: string (nullable = true)
|-- Action1: boolean (nullable = true)
|-- Action2: boolean (nullable = true)
|-- Action3: boolean (nullable = true)
<小时>
要在新列的名称中保留struct列的前缀,只需在flattenStruct
函数中调整最后一个大小写即可:
To keep the prefix of the struct columns in the name of the new columns, you only need to adjust the last case in the flattenStruct
function:
case _ => Array(col(colName).as(colName.replace(".", "_")))
这篇关于使用 Spark 和 Scala 展平 json 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!