Spark数据帧中的结构解析数组 [英] Array of struct parsing in Spark dataframe

查看:44
本文介绍了Spark数据帧中的结构解析数组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有一个struct type列的Dataframe.示例数据框架构为:

I have a Dataframe with one struct type column. Sample dataframe schema is:

root
 |-- Data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)

字段 name 保存列名,而字段 value 保存列值.未定义 Data 列中的元素数,因此它可以变化.我需要解析该数据并摆脱嵌套结构.(数组 Explode 在这种情况下将不起作用,因为一行中的数据属于一个元素).实际的模式要大得多,并且具有多个类似于数据"的数组字段,因此我的目标是创建一个通用的解决方案,该解决方案将应用于相似的结构数组.示例:

Field name holds column name and fields value holds column value. Number of elements in Data column is not defined so it can vary. I need to parse that data and get rid of nested structure. (Array Explode will not work in this case because data in one row belongs to one element). The real schema is much bigger and has multiple array field like 'Data' so my aim is to create a general solution which I will be apply to apply on similar structure arrays. Example:

样本数据:

val data = Seq(
    """{"Data": [{ "name": "FName", "value": "Alex" }, { "name": "LName",   "value": "Strong"  }]}""",
    """{"Data": [{ "name": "FName", "value": "Robert " }, { "name": "MName",   "value": "Nesta "  }]} { "name": "LName",   "value": "Marley"  }]}"""
)
val df = spark.read.json(spark.sparkContext.parallelize(data))

预期结果:

+-------+------+
|  FName| LName|
+-------+------+
|   Alex|Strong|
|Robert |Marley|
+-------+------+
 

作为解决方案,我创建了一个UDF,该UDF在整个 Data 列上执行.作为输入参数,我传递了要提取的列名和字段名.

As a solution I have create a UDF which I execute on whole Data column. As input parameters I am passing column name and a field name which I want to extract.

 val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) => {
    var value = ""
    arr.foreach(el =>
        if(el.getAs[String]("name") == columnName){
            value = el.getAs[String]("value")
        }
    )
    value
}}

问题是我正在使用变量 value 来存储中间结果,并且我不想为要在其上执行我的UDF的每一行创建一个新变量.

The problem is that I am using variable value for storing an intermediate result and I don't want to create a new a variable for each row on which my UDF will be executed.

我执行UDF的方式(该查询产生预期的结果):

The way how I am executing my UDF (That query generates an expected result):

df.select(find_scheme_name_in_array(col("Data"), lit("FName")).as("FName"),find_scheme_name_in_array(col("Data"), lit("LName")).as("LName")).show()

我很高兴听到任何有关如何改善UDF逻辑以及如何解决解析问题的不同方式的评论.

I would be happy to hear any comments on how I can improve the UDF's logic and some different ways of how to solve the parsing issue.

推荐答案

我已经通过使用 find 方法替换 foreach 循环来解决了这个问题:

I have solved the issue by substituting foreach loop with find method:

val find_scheme_name_in_array = udf { (arr: Seq[Row], columnName: String) =>
    arr.find(_.getAs[String]("name") == columnName) match {
        case Some(i) => i.getAs[String]("value")
        case None => null
    }
}

这篇关于Spark数据帧中的结构解析数组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆