定义一个接受 Spark DataFrame 中的对象数组的 UDF? [英] Defining a UDF that accepts an Array of objects in a Spark DataFrame?
问题描述
使用 Spark 的 DataFrame 时,需要用户定义函数 (UDF) 来映射列中的数据.UDF 要求显式指定参数类型.就我而言,我需要操作由对象数组组成的列,但我不知道要使用什么类型.举个例子:
When working with Spark's DataFrames, User Defined Functions (UDFs) are required for mapping data in columns. UDFs require that argument types are explicitly specified. In my case, I need to manipulate a column that is made up of arrays of objects, and I do not know what type to use. Here's an example:
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
使用内置的org.apache.spark.sql.functions
对列中的数据进行基本操作相对简单
It's relatively straightforward to use the built-in org.apache.spark.sql.functions
to perform basic operations on the data in the columns
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
编写自定义 UDF 来执行任意操作通常很容易
and it's generally easy to write custom UDFs to perform arbitrary operations
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
但是如果我想使用 UDF 来操作主题"列中的对象数组怎么办?我对 UDF 中的参数使用什么类型?例如,如果我想重新实现 size 函数,而不是使用 spark 提供的函数:
But what if I want to use a UDF to manipulate the array of objects in the "subjects" column? What type do I use for the argument in the UDF? For example, if I want to reimplement the size function, instead of using the one provided by spark:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
显然 Array[Something]
不起作用...我应该使用什么类型!?我应该完全放弃 Array[]
吗?四处看看告诉我 scala.collection.mutable.WrappedArray
可能与它有关,但我仍然需要提供另一种类型.
Clearly Array[Something]
does not work... what type should I use!? Should I ditch Array[]
altogether? Poking around tells me scala.collection.mutable.WrappedArray
may have something to do with it, but still there's another type I need to provide.
推荐答案
您要找的是 Seq[o.a.s.sql.Row]
:
import org.apache.spark.sql.Row
val my_size = udf { subjects: Seq[Row] => subjects.size }
说明:
ArrayType
的当前表示,正如您已经知道的,WrappedArray
所以Array
将不起作用,最好留在安全的一面.- 根据官方规范,
StructType
的本地(外部)类型是Row
.不幸的是,这意味着对各个字段的访问不是类型安全的.
- Current representation of
ArrayType
is, as you already know,WrappedArray
soArray
won't work and it is better to stay on the safe side. - According to the official specification, the local (external) type for
StructType
isRow
. Unfortunately it means that access to the individual fields is not type safe.
注意事项:
在 Spark 中创建
struct
2.3、传递给udf
的函数必须返回Product
类型(Tuple*
或case class
),而不是行代码>.那是因为对应的
udf
变体 依赖 Scala 反射:
To create
struct
in Spark < 2.3, function passed toudf
has to returnProduct
type (Tuple*
orcase class
), notRow
. That's because correspondingudf
variants depend on Scala reflection:
将 n 个参数的 Scala 闭包定义为用户定义函数 (UDF).数据类型是根据 Scala 闭包的签名自动推断的.
Defines a Scala closure of n arguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.
在 Spark >= 2.3 中可以直接返回 Row
,只要提供了架构.
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
使用 Scala 闭包定义确定性用户定义函数 (UDF).对于这个变体,调用者必须指定输出数据类型,并且没有自动输入类型强制.
def udf(f: AnyRef, dataType: DataType): UserDefinedFunction
Defines a deterministic user-defined function (UDF) using a Scala closure. For this variant, the caller must specify the output data type, and there is no automatic input type coercion.
参见示例如何在 Java/Kotlin 中创建返回复杂类型的 Spark UDF?.
这篇关于定义一个接受 Spark DataFrame 中的对象数组的 UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!