Spark - 将具有不同架构(列名和序列)的数据帧合并/联合到具有主通用架构的数据帧 [英] Spark - Merge / Union DataFrame with Different Schema (column names and sequence) to a DataFrame with Master common schema
问题描述
我尝试通过 df.schema() 将架构作为通用架构并将所有 CSV 文件加载到其中.但是分配的架构失败,其他 CSV 文件的标题不匹配
I tried taking a schema as a common schema by df.schema() and load all the CSV files to it .But fails as to the assigned schema , the headers of other CSV files doesnot match
任何建议将不胜感激.如在函数或火花脚本中
Any suggestions would be appreciated. as in a function or spark script
推荐答案
据我所知.您想联合/合并具有不同模式的文件(尽管是一个主模式的子集)..我写了这个函数 UnionPro,我认为它正好适合你的要求 -
as I understand it. You want to Union / Merge files with different schemas ( though subset of one Master Schema) .. I wrote this function UnionPro which I think just suits your requirement -
编辑 - 添加了 Pyspark 版本
EDIT - Added a Pyspark version
def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {
/**
* This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
* Creates a Unioned DataFrame
*/
import spark.implicits._
val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct
def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
// Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases
val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)
val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)
DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))
}
这是它的示例测试 -
val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
val bDF = Seq(("C", 1), ("D", 2)).toDF("Name", "Sal")
unionPro(List(aDF, bDF), spark).show
输出为 -
+----+----+----+
|Name| ID| Sal|
+----+----+----+
| A| 1|null|
| B| 2|null|
| C|null| 1|
| D|null| 2|
+----+----+----+
这是它的 Pyspark 版本 -
Here's Pyspark version of it -
def unionPro(DFList: List[DataFrame], caseDiff: str = "N") -> DataFrame:
"""
:param DFList:
:param caseDiff:
:return:
This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
Creates a Unioned DataFrame
"""
inputDFList = DFList if caseDiff == "N" else [df.select([F.col(x.lower) for x in df.columns]) for df in DFList]
# "This Preserves Order ( OrderedDict0-----------------------------------"
from collections import OrderedDict
## As columnNames ( String) are hashable
masterColStrList = list(OrderedDict.fromkeys(reduce(lambda x, y: x + y, [df.columns for df in inputDFList])))
# Create masterSchema ignoring different Datatype & Nullable in StructField and treating them same based on Name ignoring cases
ignoreNullable = lambda x: StructField(x.name, x.dataType, True)
import itertools
# to get reliable results by groupby iterable must be sorted by grouping key
# in sorted function key function( lambda) must be passed as named argument ( keyword argument)
# but by Sorting now, I lost original order of columns. Hence I'll use masterColStrList while returning final DF
masterSchema = StructType([list(y)[0] for x, y in itertools.groupby(
sorted(reduce(lambda x, y: x + y, [[ignoreNullable(x) for x in df.schema.fields] for df in inputDFList]),
key=lambda x: x.name),
lambda x: x.name)])
def unionExpr(myCols: List[str], allCols: List[str]) -> List[Column]:
return [F.col(x) if x in myCols else F.lit(None).alias(x) for x in allCols]
# Create Empty Dataframe
masterEmptyDF = spark.createDataFrame([], masterSchema)
return reduce(lambda x, y: x.unionByName(y),
[df.select(unionExpr(df.columns, masterColStrList)) for df in inputDFList], masterEmptyDF).select(
masterColStrList)
这篇关于Spark - 将具有不同架构(列名和序列)的数据帧合并/联合到具有主通用架构的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!