Spark-将具有不同架构(列名称和顺序)的DataFrame合并/合并到具有Master通用架构的DataFrame [英] Spark - Merge / Union DataFrame with Different Schema (column names and sequence) to a DataFrame with Master common schema

查看:216
本文介绍了Spark-将具有不同架构(列名称和顺序)的DataFrame合并/合并到具有Master通用架构的DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试通过df.schema()将一个模式作为通用模式并将所有CSV文件加载到其中,但是由于分配的模式失败,其他CSV文件的标题不匹配

I tried taking a schema as a common schema by df.schema() and load all the CSV files to it .But fails as to the assigned schema , the headers of other CSV files doesnot match

任何建议将不胜感激.就像在函数或Spark脚本中一样

Any suggestions would be appreciated. as in a function or spark script

推荐答案

.您想要合并/合并具有不同架构的文件(尽管是一个主架构的子集). 我写了这个函数UnionPro,我认为它很适合您的要求-

as I understand it. You want to Union / Merge files with different schemas ( though subset of one Master Schema) .. I wrote this function UnionPro which I think just suits your requirement -

编辑-添加了Pyspark版本

EDIT - Added a Pyspark version

def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    import spark.implicits._

    val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
      allCols.toList.map(x => x match {
        case x if myCols.contains(x) => col(x)
        case _                       => lit(null).as(x)
      })
    }

    // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases

    val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)

    DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  }

这是它的样本测试-


    val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1), ("D", 2)).toDF("Name", "Sal")
    unionPro(List(aDF, bDF), spark).show

哪个输出为-

+----+----+----+
|Name|  ID| Sal|
+----+----+----+
|   A|   1|null|
|   B|   2|null|
|   C|null|   1|
|   D|null|   2|
+----+----+----+

这是它的Pyspark版本-

Here's Pyspark version of it -

def unionPro(DFList: List[DataFrame], caseDiff: str = "N") -> DataFrame:
    """
    :param DFList:
    :param caseDiff:
    :return:
    This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
    Creates a Unioned DataFrame
    """
    inputDFList = DFList if caseDiff == "N" else [df.select([F.col(x.lower) for x in df.columns]) for df in DFList]

    # "This Preserves Order ( OrderedDict0-----------------------------------"
    from collections import OrderedDict
    ## As columnNames ( String) are hashable
    masterColStrList = list(OrderedDict.fromkeys(reduce(lambda x, y: x + y, [df.columns for df in inputDFList])))

    # Create masterSchema ignoring different Datatype & Nullable  in StructField and treating them same based on Name ignoring cases
    ignoreNullable = lambda x: StructField(x.name, x.dataType, True)

    import itertools

    
    # to get reliable results by groupby iterable must be sorted by grouping key
    # in sorted function key function( lambda) must be passed as named argument ( keyword argument)
    # but by Sorting now, I lost original order of columns. Hence I'll use masterColStrList while returning final DF
    masterSchema = StructType([list(y)[0] for x, y in itertools.groupby(
        sorted(reduce(lambda x, y: x + y, [[ignoreNullable(x) for x in df.schema.fields] for df in inputDFList]),
               key=lambda x: x.name),
        lambda x: x.name)])

    def unionExpr(myCols: List[str], allCols: List[str]) -> List[Column]:
        return [F.col(x) if x in myCols else F.lit(None).alias(x) for x in allCols]

    # Create Empty Dataframe
    masterEmptyDF = spark.createDataFrame([], masterSchema)

    return reduce(lambda x, y: x.unionByName(y),
                  [df.select(unionExpr(df.columns, masterColStrList)) for df in inputDFList], masterEmptyDF).select(
        masterColStrList)

这篇关于Spark-将具有不同架构(列名称和顺序)的DataFrame合并/合并到具有Master通用架构的DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆