从 Scala 中的 StructType 中提取行标记模式以解析嵌套的 XML [英] Extracting row tag schema from StructType in Scala to parse nested XML

查看:39
本文介绍了从 Scala 中的 StructType 中提取行标记模式以解析嵌套的 XML的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 spark-xml 库将广泛的嵌套 XML 文件解析为 DataFrame.

这是一个缩写的架构定义 (XSD):

<xs:schema attributeFormDefault="unqualified";elementFormDefault=合格";xmlns:xs=http://www.w3.org/2001/XMLSchema"><xs:element name="ItemExport"><xs:complexType><xs:序列><xs:元素名称=项目"><xs:complexType><xs:序列><xs:element name="ITEM_ID";类型=xs:整数";/><xs:element name="CONTEXT";类型=xs:字符串";/><xs:元素名称=类型"类型=xs:字符串";/>...<xs:element name="CLASSIFICATIONS"><xs:complexType><xs:序列><xs:element maxOccurs="无界";名称=分类"><xs:complexType><xs:序列><xs:element name="CLASS_SCHEME";类型=xs:字符串";/><xs:element name="CLASS_LEVEL";类型=xs:字符串";/><xs:element name="CLASS_CODE";类型=xs:字符串";/><xs:element name="CLASS_CODE_NAME";类型=xs:字符串";/><xs:element name="EFFECTIVE_FROM";类型=xs:日期时间";/><xs:element name="EFFECTIVE_TO";类型=xs:日期时间";/></xs:sequence></xs:complexType></xs:element></xs:sequence></xs:complexType></xs:element></xs:sequence></xs:complexType></xs:element></xs:sequence></xs:complexType></xs:element></xs:schema>

包含数据的 XML 文件看起来像这样:

<物品导出><TIMEZONE>PT</TIMEZONE><物品><ITEM_ID>56</ITEM_ID><CONTEXT>示例</CONTEXT><TYPE>产品</TYPE></项目>...<物品><ITEM_ID>763</ITEM_ID><CONTEXT>示例</CONTEXT><TYPE>产品</TYPE><分类><分类><CLASS_SCHEME>AAU</CLASS_SCHEME><CLASS_LEVEL>1</CLASS_LEVEL><CLASS_CODE>14</CLASS_CODE><CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME><EFFECTIVE_FROM/><EFFECTIVE_TO/></分类></分类></项目><物品导出>

现在,很清楚RowTag 需要是Item,但我遇到了有关XSD 的问题.行架构封装在文档架构中.

import com.databricks.spark.xml.util.XSDToSchema导入 com.databricks.spark.xml._导入 java.nio.file.Paths导入 org.apache.spark.sql.functions._val inputFile = "dbfs:/samples/ItemExport.xml";val schema = XSDToSchema.read(Paths.get("/dbfs/samples/ItemExport.xsd"))val df1 = spark.read.option("rowTag", "Item").xml(inputFile)val df2 = spark.read.schema(schema).xml(inputFile)

我基本上是想获取根元素下Item下的struct,而不是整个文档架构.

schema.printTreeString根|-- ItemExport: struct (nullable = false)||-- 项目: struct (nullable = false)|||-- ITEM_ID:整数(可为空 = false)|||-- 上下文:字符串(可为空 = false)|||-- 类型:字符串(可为空 = 假)...(还有几个字段...)|||-- 分类:结构(可为空 = 假)||||-- 分类:数组(可为空 = false)|||||-- 元素: struct (containsNull = true)||||||-- CLASS_SCHEME:字符串(可为空 = false)||||||-- CLASS_LEVEL:字符串(可为空 = false)||||||-- CLASS_CODE: 字符串(可为空 = false)||||||-- CLASS_CODE_NAME:字符串(可为空 = false)||||||-- EFFECTIVE_FROM:时间戳(可为空 = false)||||||-- EFFECTIVE_TO:时间戳(可为空 = 假)

在上面的例子中,使用文档模式解析会产生一个空的 DataFrame:

df2.show()+-----------+|项目出口|+-----------++-----------+

虽然推断的模式基本上是正确的,但它只能在嵌套列存在时推断它们(情况并非总是如此):

df1.show()+-----------+--------------------+------------+---------------+|ITEM_ID|背景|类型|分类|+-----------+--------------------+------------+---------------+|56|样品 |产品|{空}||57|样品 |产品|{空}||59|部分 |组件|{空}||60|部分 |组件|{空}||61|样品 |产品|{空}||62|样品 |产品|{空}||63|组装 |产品|{空}|df1.printSchema根|-- ITEM_ID: long (nullable = true)|-- 上下文:字符串(可为空 = false)|-- 类型:字符串(可为空 = 真)...|-- 分类:结构(可为空 = 真)||-- 分类:数组(可为空 = 真)|||-- 元素: struct (containsNull = true)||||-- CLASS_CODE:长(可为空 = 真)||||-- CLASS_CODE_NAME:字符串(可为空 = 真)||||-- CLASS_LEVEL: long (nullable = true)||||-- CLASS_SCHEME:字符串(可为空 = 真)||||-- EFFECTIVE_FROM: 字符串 (nullable = true)||||-- EFFECTIVE_TO: 字符串(可为空 = 真)

此处所述XML 库文档(XSD 文件的路径用于单独验证每一行的 XML"),我可以解析为给定的行级架构:

import org.apache.spark.sql.types._val structschema = StructType(大批(StructField("ITEM_ID",IntegerType,false),StructField("CONTEXT",StringType,false),StructField("TYPE",StringType,false),))val df_struct = spark.read.schema(structschema).option("rowTag", "Item").xml(inputFile)

不过,我想从 XSD 获取嵌套列的架构.给定 schema,如何解决这个问题?

版本信息:Scala 2.12、Spark 3.1.1、spark-xml 0.12.0

解决方案

XSD 中的列是必需的或不为空 &XML 文件中的某些列为空以匹配 XSD &XML 文件内容,将架构从 nullable=false 更改为 nullable=true

试试下面的代码.

 导入 com.databricks.spark.xml.util.XSDToSchema导入 com.databricks.spark.xml._导入 java.nio.file.Paths导入 org.apache.spark.sql.functions._

 val inputFile = dbfs:/samples/ItemExport.xml"

从 XSD 获取架构,将相同架构应用于空数据帧以获取所需列.

 val schema = spark.createDataFrame(火花.sparkContext.emptyRDD[行],XSD 到架构.read(Paths.get("/dbfs/samples/ItemExport.xsd"))).select("ItemExport.Item.*").schema

<预><代码>val df2 = spark.read.option("rootTag", "ItemExport").option("rowTag", "Item").schema(setNullable(schema, true))//匹配 XSD &XML 文件内容设置所有列都是可选的,即 nullable=true.xml(输入文件)

下面的函数将更改所有列 optionalnullable=true

 def setNullable(schema: StructType, nullable:Boolean = false): StructType = {def recurNullable(schema: StructType): Seq[StructField] =schema.fields.map{case StructField(name, dtype: StructType, _, meta) =>StructField(name, StructType(recurNullable(dtype)), nullable, meta)case StructField(name, dtype: ArrayType, _, meta) =>dtype.elementType 匹配 {案例结构:StructType =>StructField(name, ArrayType(StructType(recurNullable(struct)), true), nullable, meta)情况其他=>StructField(名称,其他,可为空,元)}case StructField(name, dtype, _, meta) =>StructField(名称,数据类型,可为空,元)}结构类型(recurNullable(架构))}

I'm trying to parse a wide, nested XML file into a DataFrame using the spark-xml library.

Here is an abbreviated schema definition (XSD):

<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ItemExport">
    <xs:complexType>
    <xs:sequence> 
        <xs:element name="Item">
            <xs:complexType>
            <xs:sequence>
                <xs:element name="ITEM_ID" type="xs:integer" />
                <xs:element name="CONTEXT" type="xs:string" />
                <xs:element name="TYPE" type="xs:string" />
                ...
                <xs:element name="CLASSIFICATIONS">
                    <xs:complexType>
                        <xs:sequence>
                        <xs:element maxOccurs="unbounded" name="CLASSIFICATION">
                            <xs:complexType>
                            <xs:sequence>
                                <xs:element name="CLASS_SCHEME" type="xs:string" />
                                <xs:element name="CLASS_LEVEL" type="xs:string" />
                                <xs:element name="CLASS_CODE" type="xs:string" />
                                <xs:element name="CLASS_CODE_NAME" type="xs:string" />
                                <xs:element name="EFFECTIVE_FROM" type="xs:dateTime" />
                                <xs:element name="EFFECTIVE_TO" type="xs:dateTime" />
                            </xs:sequence>
                            </xs:complexType>
                        </xs:element>
                        </xs:sequence>
                    </xs:complexType>
                </xs:element>
            </xs:sequence>
            </xs:complexType>
        </xs:element>
    </xs:sequence>
    </xs:complexType>
</xs:element>
</xs:schema>

The XML file containing the data would looks something like this:

<?xml version="1.0" encoding="utf-8"?>
<ItemExport>
    <TIMEZONE>PT</TIMEZONE>
    <Item>
        <ITEM_ID>56</ITEM_ID>
        <CONTEXT>Sample</CONTEXT>
        <TYPE>Product</TYPE>
    </Item>
    ...
    <Item>
        <ITEM_ID>763</ITEM_ID>
        <CONTEXT>Sample</CONTEXT>
        <TYPE>Product</TYPE>
        <CLASSIFICATIONS>
            <CLASSIFICATION>
                <CLASS_SCHEME>AAU</CLASS_SCHEME>
                <CLASS_LEVEL>1</CLASS_LEVEL>
                <CLASS_CODE>14</CLASS_CODE>
                <CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME>
                <EFFECTIVE_FROM />
                <EFFECTIVE_TO />
            </CLASSIFICATION>
        </CLASSIFICATIONS>
    </Item>
<ItemExport>

Now, what's clear is that the RowTag needs to be Item, but I've encountered an issue regarding the XSD. The row schema is encapsulated within the document schema.

import com.databricks.spark.xml.util.XSDToSchema
import com.databricks.spark.xml._
import java.nio.file.Paths
import org.apache.spark.sql.functions._

val inputFile = "dbfs:/samples/ItemExport.xml"
val schema = XSDToSchema.read(Paths.get("/dbfs/samples/ItemExport.xsd"))
val df1 = spark.read.option("rowTag", "Item").xml(inputFile)
val df2 = spark.read.schema(schema).xml(inputFile)

I basically want to get the struct under Item under the root element, not the entire document schema.

schema.printTreeString

root
|-- ItemExport: struct (nullable = false)
|    |-- Item: struct (nullable = false)
|    |    |-- ITEM_ID: integer (nullable = false)
|    |    |-- CONTEXT: string (nullable = false)
|    |    |-- TYPE: string (nullable = false)
...(a few more fields...)
|    |    |-- CLASSIFICATIONS: struct (nullable = false)
|    |    |    |-- CLASSIFICATION: array (nullable = false)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- CLASS_SCHEME: string (nullable = false)
|    |    |    |    |    |-- CLASS_LEVEL: string (nullable = false)
|    |    |    |    |    |-- CLASS_CODE: string (nullable = false)
|    |    |    |    |    |-- CLASS_CODE_NAME: string (nullable = false)
|    |    |    |    |    |-- EFFECTIVE_FROM: timestamp (nullable = false)
|    |    |    |    |    |-- EFFECTIVE_TO: timestamp (nullable = false)

In the case above, parsing with the document schema yields an empty DataFrame:

df2.show()

+-----------+
| ItemExport|
+-----------+
+-----------+

while the inferred schema is basically correct, but it can only infer nested columns when they are present (which is not always the case):

df1.show()

+----------+--------------------+----------+---------------+
|   ITEM_ID|             CONTEXT|      TYPE|CLASSIFICATIONS|
+----------+--------------------+----------+---------------+
|        56|            Sample  |   Product|         {null}|
|        57|            Sample  |   Product|         {null}|
|        59|              Part  | Component|         {null}|
|        60|              Part  | Component|         {null}|
|        61|            Sample  |   Product|         {null}|
|        62|            Sample  |   Product|         {null}|
|        63|          Assembly  |   Product|         {null}|

df1.printSchema

root
|-- ITEM_ID: long (nullable = true)
|-- CONTEXT: string (nullable = false)
|-- TYPE: string (nullable = true)
...
|-- CLASSIFICATIONS: struct (nullable = true)
|    |-- CLASSIFICATION: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- CLASS_CODE: long (nullable = true)
|    |    |    |-- CLASS_CODE_NAME: string (nullable = true)
|    |    |    |-- CLASS_LEVEL: long (nullable = true)
|    |    |    |-- CLASS_SCHEME: string (nullable = true)
|    |    |    |-- EFFECTIVE_FROM: string (nullable = true)
|    |    |    |-- EFFECTIVE_TO: string (nullable = true)

As described here and in the XML library docs ("Path to an XSD file that is used to validate the XML for each row individually"), I can parse into a given row-level schema as such:

import org.apache.spark.sql.types._

val structschema = StructType(
  Array(
    StructField("ITEM_ID",IntegerType,false), 
    StructField("CONTEXT",StringType,false), 
    StructField("TYPE",StringType,false),
  )
)

val df_struct = spark.read.schema(structschema).option("rowTag", "Item").xml(inputFile)

I'd like to obtain the schema for the nested columns from the XSD however. How to go about this given the schema?

Version info: Scala 2.12, Spark 3.1.1, spark-xml 0.12.0

解决方案

Columns in XSD are required or not null & Some of the columns in XML file is null to match XSD & XML file content, change schema from nullable=false to nullable=true

Try following code.

  import com.databricks.spark.xml.util.XSDToSchema
  import com.databricks.spark.xml._
  import java.nio.file.Paths
  import org.apache.spark.sql.functions._

  val inputFile = "dbfs:/samples/ItemExport.xml"

Getting schema from XSD, Applying same schema to an empty dataframe to get required columns.

 val schema = spark
    .createDataFrame(
      spark
        .sparkContext
        .emptyRDD[Row],
      XSDToSchema
        .read(Paths.get("/dbfs/samples/ItemExport.xsd"))
    )
    .select("ItemExport.Item.*")
    .schema


  val df2 = spark.read
    .option("rootTag", "ItemExport")
    .option("rowTag", "Item")
    .schema(setNullable(schema, true)) // To match XSD & XML file content setting all columns are optional i.e nullable=true
    .xml(inputFile)

Below function will change all columns optional or nullable=true

  def setNullable(schema: StructType, nullable:Boolean = false): StructType = {
    def recurNullable(schema: StructType): Seq[StructField] =
      schema.fields.map{
        case StructField(name, dtype: StructType, _, meta) =>
          StructField(name, StructType(recurNullable(dtype)), nullable, meta)
        case StructField(name, dtype: ArrayType, _, meta) => dtype.elementType match {
          case struct: StructType => StructField(name, ArrayType(StructType(recurNullable(struct)), true), nullable, meta)
          case other => StructField(name, other, nullable, meta)
        }
        case StructField(name, dtype, _, meta) =>
          StructField(name, dtype, nullable, meta)
      }

    StructType(recurNullable(schema))
  }

这篇关于从 Scala 中的 StructType 中提取行标记模式以解析嵌套的 XML的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆