“扁平化"时的 Spark AnalysisExceptionSpark SQL 中的数据帧 [英] Spark AnalysisException when "flattening" DataFrame in Spark SQL

查看:24
本文介绍了“扁平化"时的 Spark AnalysisExceptionSpark SQL 中的数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用此处给出的方法在 Spark SQL 中展平 DataFrame.这是我的代码:

I'm using the approach given here to flatten a DataFrame in Spark SQL. Here is my code:

package com.acme.etl.xml

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.{Column, SparkSession}

object RuntimeError {   def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
    val rowTag = "idocData"
    val dataFrameReader =
        spark.read
          .option("rowTag", rowTag)
    val xmlUri = "bad_011_1.xml"
    val df =
        dataFrameReader
          .format("xml")
          .load(xmlUri)
    val schema: StructType = df.schema
    val columns: Array[Column] = flattenSchema(schema)
    val df2 = df.select(columns: _*)

  }

  def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val colName: String = if (prefix == null) f.name else prefix + "." + f.name
      val dataType = f.dataType
      dataType match {
        case st: StructType => flattenSchema(st, colName)
        case _: StringType => Array(new org.apache.spark.sql.Column(colName))
        case _: LongType => Array(new org.apache.spark.sql.Column(colName))
        case _: DoubleType => Array(new org.apache.spark.sql.Column(colName))
        case arrayType: ArrayType => arrayType.elementType match {
          case structType: StructType => flattenSchema(structType, colName)
        }
        case _ => Array(new org.apache.spark.sql.Column(colName))
      }
    })
  }
}

在大多数情况下,这很好用.但是对于下面给出的 XML:

Much of the time, this works fine. But for the XML given below:

<Receive xmlns="http://Microsoft.LobServices.Sap/2007/03/Idoc/3/ORDERS05/ZORDERS5/702/Receive">
    <idocData>
        <E2EDP01008GRP xmlns="http://Microsoft.LobServices.Sap/2007/03/Types/Idoc/3/ORDERS05/ZORDERS5/702">
            <E2EDPT1001GRP>
                <E2EDPT2001>
                    <DATAHEADERCOLUMN_DOCNUM>0000000141036013</DATAHEADERCOLUMN_DOCNUM>
                </E2EDPT2001>
                <E2EDPT2001>
                    <DATAHEADERCOLUMN_DOCNUM>0000000141036013</DATAHEADERCOLUMN_DOCNUM>
                </E2EDPT2001>
            </E2EDPT1001GRP>
        </E2EDP01008GRP>
        <E2EDP01008GRP xmlns="http://Microsoft.LobServices.Sap/2007/03/Types/Idoc/3/ORDERS05/ZORDERS5/702">
        </E2EDP01008GRP>
    </idocData>
</Receive>

发生此异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`E2EDP01008GRP`.`E2EDPT1001GRP`.`E2EDPT2001`['DATAHEADERCOLUMN_DOCNUM']' due to data type mismatch: argument 2 requires integral type, however, ''DATAHEADERCOLUMN_DOCNUM'' is of string type.;;
'Project [E2EDP01008GRP#0.E2EDPT1001GRP.E2EDPT2001[DATAHEADERCOLUMN_DOCNUM] AS DATAHEADERCOLUMN_DOCNUM#3, E2EDP01008GRP#0._VALUE AS _VALUE#4, E2EDP01008GRP#0._xmlns AS _xmlns#5]
+- Relation[E2EDP01008GRP#0] XmlRelation(<function0>,Some(/Users/paulreiners/s3/cdi-events-partition-staging/content_acme_purchase_order_json_v1/bad_011_1.xml),Map(rowtag -> idocData, path -> /Users/paulreiners/s3/cdi-events-partition-staging/content_acme_purchase_order_json_v1/bad_011_1.xml),null)

这是什么原因造成的?

推荐答案

您的文档包含一个多值数组,因此您无法一次将其完全展平,因为您不能将数组的两个元素都指定为同一列姓名.此外,在列名中使用点通常是一个坏主意,因为它很容易混淆 Spark 解析器,并且需要始终进行转义.

Your document contains a multi-valued array so you can't flatten it completely in one pass since you can't give both elements of the array the same column name. Also, it's usually a bad idea to use a dot within a column name since it can easily confuse the Spark parser and will need to be escaped at all time.

扁平化此类数据集的常用方法是为数组的每个元素创建新行.您可以使用 explode 函数来执行此操作,但您将需要递归调用您的展平操作,因为 explode 不能嵌套.

The usual way to flatten such a dataset is to create new rows for each element of the array. You can use the explode function to do this but you will need to recursively call your flatten operation because explode can't be nested.

以下代码按预期工作,使用_"而不是."作为列名分隔符:

The following code works as expected, using '_' instead of '.' as column name separator:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.{Dataset, Row}

object RuntimeError {   

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
    val rowTag = "idocData"
    val dataFrameReader = spark.read.option("rowTag", rowTag)
    val xmlUri = "bad_011_1.xml"
    val df = dataFrameReader.format("xml").load(xmlUri)

    val df2 = flatten(df)

  }

  def flatten(df: Dataset[Row], prefixSeparator: String = "_") : Dataset[Row] = {
    import org.apache.spark.sql.functions.{col,explode}

    def mustFlatten(sc: StructType): Boolean =
      sc.fields.exists(f => f.dataType.isInstanceOf[ArrayType] || f.dataType.isInstanceOf[StructType])

    def flattenAndExplodeOne(sc: StructType, parent: Column = null, prefix: String = null, cols: Array[(DataType,Column)] = Array[(DataType,Column)]()): Array[(DataType,Column)] = {
      val res = sc.fields.foldLeft(cols)( (columns, f) => {
        val my_col = if (parent == null) col(f.name) else parent.getItem(f.name)
        val flat_name = if (prefix == null) f.name else s"${prefix}${prefixSeparator}${f.name}"
        f.dataType match {
          case st: StructType => flattenAndExplodeOne(st, my_col, flat_name, columns)

          case dt: ArrayType => {
            if (columns.exists(_._1.isInstanceOf[ArrayType])) {
              columns :+ ((dt,  my_col.as(flat_name)))
            } else {
              columns :+ ((dt, explode(my_col).as(flat_name)))
            }
          }
          case dt => columns :+ ((dt, my_col.as(flat_name)))
        }
      })
      res
    }

    var flatDf = df
    while (mustFlatten(flatDf.schema)) {
      val newColumns = flattenAndExplodeOne(flatDf.schema, null, null).map(_._2)
      flatDf = flatDf.select(newColumns:_*)
    }

    flatDf
  }
}

生成的 df2 具有以下架构和数据:

The resulting df2 has the following schema and data:

df2.printSchema
root
 |-- E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM: long (nullable = true)
 |-- E2EDP01008GRP__xmlns: string (nullable = true)


df2.show(true)
+--------------------------------------------------------------+--------------------+
|E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM|E2EDP01008GRP__xmlns|
+--------------------------------------------------------------+--------------------+
|                                                     141036013|http://Microsoft....|
|                                                     141036013|http://Microsoft....|
+--------------------------------------------------------------+--------------------+

这篇关于“扁平化"时的 Spark AnalysisExceptionSpark SQL 中的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆