使用 UDF 从 PySpark Dataframe 解析 XML 列 [英] parsing XML columns from PySpark Dataframe using UDF

查看:59
本文介绍了使用 UDF 从 PySpark Dataframe 解析 XML 列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个场景,我在数据框列中有 XML 数据.

I have a scenario where I have XML data in a dataframe column.

+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
|     county|created_at|first_name|                  id|meta|name_count|position|sex|               sid|updated_at|            visitors|year|
+-----------+----------+----------+--------------------+----+----------+--------+---+------------------+----------+--------------------+----+
|      KINGS|1574264158|      ZOEY|00000000-0000-000...| { }|        11|       0|  F|row-r9pv-p86t.ifsp|1574264158|<?xml version="1....|2007|

我想解析 - Visitors 列 - 使用 UDF 将嵌套的 XML 字段解析为 Dataframe 中的列

I want to parse - Visitors column - the nested XML fields into columns in Dataframe using UDF

XML 格式 -

<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>

推荐答案

有一节关于 Databricks spark-xml Github 页面 讨论了解析嵌套 xml,它提供了一个使用 Scala API 的解决方案,以及几个 Pyspark 辅助函数来解决没有单独的问题spark-xml 的 Python 包.因此,使用这些,您可以通过以下方法解决问题:

There's a section on the Databricks spark-xml Github page which talks about parsing nested xml, and it provides a solution using the Scala API, as well as a couple of Pyspark helper functions to work around the issue that there is no separate Python package for spark-xml. So using these, here's one way you could solve the problem:

# 1. Copy helper functions from https://github.com/databricks/spark-xml#pyspark-notes

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
import pyspark.sql.functions as F


def ext_from_xml(xml_column, schema, options={}):
    java_column = _to_java_column(xml_column.cast('string'))
    java_schema = spark._jsparkSession.parseDataType(schema.json())
    scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
    jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
        java_column, java_schema, scala_map)
    return Column(jc)

def ext_schema_of_xml_df(df, options={}):
    assert len(df.columns) == 1

    scala_options = spark._jvm.PythonUtils.toScalaMap(options)
    java_xml_module = getattr(getattr(
        spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
    java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
    return _parse_datatype_json_string(java_schema.json())

# 2. Set up example dataframe

xml = '<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>'

df = spark.createDataFrame([('1',xml)],['id','visitors'])
df.show()

# +---+--------------------+
# | id|            visitors|
# +---+--------------------+
# |  1|<?xml version="1....|
# +---+--------------------+

# 3. Get xml schema and parse xml column

payloadSchema = ext_schema_of_xml_df(df.select("visitors"))
parsed = df.withColumn("parsed", ext_from_xml(F.col("visitors"), payloadSchema))
parsed.show()

# +---+--------------------+--------------------+
# | id|            visitors|              parsed|
# +---+--------------------+--------------------+
# |  1|<?xml version="1....|[[[, 68, 9615, F]...|
# +---+--------------------+--------------------+

# 4. Extract 'visitor' field from StructType
df2 = parsed.select(*parsed.columns[:-1],F.explode(F.col('parsed').getItem('visitor')))
df2.show()

# +---+--------------------+---------------+
# | id|            visitors|            col|
# +---+--------------------+---------------+
# |  1|<?xml version="1....|[, 68, 9615, F]|
# |  1|<?xml version="1....|[, 34, 1882, M]|
# |  1|<?xml version="1....|[, 23, 5987, M]|
# +---+--------------------+---------------+

# 5. Get field names, which will become new columns
# (there's probably a much better way of doing this :D)
new_col_names = [s.split(':')[0] for s in payloadSchema['visitor'].simpleString().split('<')[-1].strip('>>').split(',')]

new_col_names

# ['_VALUE', '_age', '_id', '_sex']

# 6. Create new columns

for c in new_col_names:
    df2 = df2.withColumn(c, F.col('col').getItem(c))
    
df2 = df2.drop('col','_VALUE')

df2.show()

# +---+--------------------+----+----+----+
# | id|            visitors|_age| _id|_sex|
# +---+--------------------+----+----+----+
# |  1|<?xml version="1....|  68|9615|   F|
# |  1|<?xml version="1....|  34|1882|   M|
# |  1|<?xml version="1....|  23|5987|   M|
# +---+--------------------+----+----+----+

需要注意的一件事是新列名与现有列名重复 - 在这种情况下,新列名都以下划线开头,因此我们没有任何重复,但最好检查嵌套的 xml标签事先不会与现有的列名冲突.

One thing to look out for is the new column names duplicating existing column names - in this case the new column names are all preceded by underscores so we don't have any duplication, but it's probably good to check that the nested xml tags don't conflict with existing column names beforehand.

这篇关于使用 UDF 从 PySpark Dataframe 解析 XML 列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆