“扁平化"时触发Spark AnalysisException. Spark SQL中的DataFrame [英] Spark AnalysisException when "flattening" DataFrame in Spark SQL

查看:79
本文介绍了“扁平化"时触发Spark AnalysisException. Spark SQL中的DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用此处给出的方法在Spark SQL中展平DataFrame.这是我的代码:

I'm using the approach given here to flatten a DataFrame in Spark SQL. Here is my code:

package com.acme.etl.xml

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.{Column, SparkSession}

object RuntimeError {   def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
    val rowTag = "idocData"
    val dataFrameReader =
        spark.read
          .option("rowTag", rowTag)
    val xmlUri = "bad_011_1.xml"
    val df =
        dataFrameReader
          .format("xml")
          .load(xmlUri)
    val schema: StructType = df.schema
    val columns: Array[Column] = flattenSchema(schema)
    val df2 = df.select(columns: _*)

  }

  def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val colName: String = if (prefix == null) f.name else prefix + "." + f.name
      val dataType = f.dataType
      dataType match {
        case st: StructType => flattenSchema(st, colName)
        case _: StringType => Array(new org.apache.spark.sql.Column(colName))
        case _: LongType => Array(new org.apache.spark.sql.Column(colName))
        case _: DoubleType => Array(new org.apache.spark.sql.Column(colName))
        case arrayType: ArrayType => arrayType.elementType match {
          case structType: StructType => flattenSchema(structType, colName)
        }
        case _ => Array(new org.apache.spark.sql.Column(colName))
      }
    })
  }
}

在很多时候,这都可以正常工作.但是对于下面给出的XML:

Much of the time, this works fine. But for the XML given below:

<Receive xmlns="http://Microsoft.LobServices.Sap/2007/03/Idoc/3/ORDERS05/ZORDERS5/702/Receive">
    <idocData>
        <E2EDP01008GRP xmlns="http://Microsoft.LobServices.Sap/2007/03/Types/Idoc/3/ORDERS05/ZORDERS5/702">
            <E2EDPT1001GRP>
                <E2EDPT2001>
                    <DATAHEADERCOLUMN_DOCNUM>0000000141036013</DATAHEADERCOLUMN_DOCNUM>
                </E2EDPT2001>
                <E2EDPT2001>
                    <DATAHEADERCOLUMN_DOCNUM>0000000141036013</DATAHEADERCOLUMN_DOCNUM>
                </E2EDPT2001>
            </E2EDPT1001GRP>
        </E2EDP01008GRP>
        <E2EDP01008GRP xmlns="http://Microsoft.LobServices.Sap/2007/03/Types/Idoc/3/ORDERS05/ZORDERS5/702">
        </E2EDP01008GRP>
    </idocData>
</Receive>

发生此异常:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`E2EDP01008GRP`.`E2EDPT1001GRP`.`E2EDPT2001`['DATAHEADERCOLUMN_DOCNUM']' due to data type mismatch: argument 2 requires integral type, however, ''DATAHEADERCOLUMN_DOCNUM'' is of string type.;;
'Project [E2EDP01008GRP#0.E2EDPT1001GRP.E2EDPT2001[DATAHEADERCOLUMN_DOCNUM] AS DATAHEADERCOLUMN_DOCNUM#3, E2EDP01008GRP#0._VALUE AS _VALUE#4, E2EDP01008GRP#0._xmlns AS _xmlns#5]
+- Relation[E2EDP01008GRP#0] XmlRelation(<function0>,Some(/Users/paulreiners/s3/cdi-events-partition-staging/content_acme_purchase_order_json_v1/bad_011_1.xml),Map(rowtag -> idocData, path -> /Users/paulreiners/s3/cdi-events-partition-staging/content_acme_purchase_order_json_v1/bad_011_1.xml),null)

是什么原因造成的?

推荐答案

您的文档包含一个多值数组,因此您无法一次性将其完全展平,因为您不能为数组的两个元素赋予同一列姓名. 另外,在列名中使用点通常是一个坏主意,因为它很容易使Spark解析器感到困惑,并且需要始终进行转义.

Your document contains a multi-valued array so you can't flatten it completely in one pass since you can't give both elements of the array the same column name. Also, it's usually a bad idea to use a dot within a column name since it can easily confuse the Spark parser and will need to be escaped at all time.

平整此类数据集的常用方法是为数组的每个元素创建新行. 您可以使用explode函数执行此操作,但由于explode不能嵌套,因此您将需要递归调用flatten操作.

The usual way to flatten such a dataset is to create new rows for each element of the array. You can use the explode function to do this but you will need to recursively call your flatten operation because explode can't be nested.

以下代码使用'_'代替'.,按预期方式工作.作为列名分隔符:

The following code works as expected, using '_' instead of '.' as column name separator:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.{Dataset, Row}

object RuntimeError {   

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
    val rowTag = "idocData"
    val dataFrameReader = spark.read.option("rowTag", rowTag)
    val xmlUri = "bad_011_1.xml"
    val df = dataFrameReader.format("xml").load(xmlUri)

    val df2 = flatten(df)

  }

  def flatten(df: Dataset[Row], prefixSeparator: String = "_") : Dataset[Row] = {
    import org.apache.spark.sql.functions.{col,explode}

    def mustFlatten(sc: StructType): Boolean =
      sc.fields.exists(f => f.dataType.isInstanceOf[ArrayType] || f.dataType.isInstanceOf[StructType])

    def flattenAndExplodeOne(sc: StructType, parent: Column = null, prefix: String = null, cols: Array[(DataType,Column)] = Array[(DataType,Column)]()): Array[(DataType,Column)] = {
      val res = sc.fields.foldLeft(cols)( (columns, f) => {
        val my_col = if (parent == null) col(f.name) else parent.getItem(f.name)
        val flat_name = if (prefix == null) f.name else s"${prefix}${prefixSeparator}${f.name}"
        f.dataType match {
          case st: StructType => flattenAndExplodeOne(st, my_col, flat_name, columns)

          case dt: ArrayType => {
            if (columns.exists(_._1.isInstanceOf[ArrayType])) {
              columns :+ ((dt,  my_col.as(flat_name)))
            } else {
              columns :+ ((dt, explode(my_col).as(flat_name)))
            }
          }
          case dt => columns :+ ((dt, my_col.as(flat_name)))
        }
      })
      res
    }

    var flatDf = df
    while (mustFlatten(flatDf.schema)) {
      val newColumns = flattenAndExplodeOne(flatDf.schema, null, null).map(_._2)
      flatDf = flatDf.select(newColumns:_*)
    }

    flatDf
  }
}

生成的df2具有以下架构和数据:

The resulting df2 has the following schema and data:

df2.printSchema
root
 |-- E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM: long (nullable = true)
 |-- E2EDP01008GRP__xmlns: string (nullable = true)


df2.show(true)
+--------------------------------------------------------------+--------------------+
|E2EDP01008GRP_E2EDPT1001GRP_E2EDPT2001_DATAHEADERCOLUMN_DOCNUM|E2EDP01008GRP__xmlns|
+--------------------------------------------------------------+--------------------+
|                                                     141036013|http://Microsoft....|
|                                                     141036013|http://Microsoft....|
+--------------------------------------------------------------+--------------------+

这篇关于“扁平化"时触发Spark AnalysisException. Spark SQL中的DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆