返回动态数据类型的 Apache Spark UDF [英] Apache Spark UDF that returns dynamic data types
问题描述
我有 UDF
处理 JSON 并返回每行的动态数据结果.在我的情况下,我需要它来验证数据并返回经过验证的数据.
I have UDF
that processes JSON and returns dynamic data results per row. In my case I need this to validate data and return validated data.
架构对于每一行都是灵活的.这意味着我无法为每个案例创建 case class
(我的一些数据可以嵌套).
The schema is flexible for each row. This means I cannot create case class
for every case (some of my data can be nested).
我试图从我的 UDF 函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案.
I've tried to return tuple from my UDF function, but I had no luck in this either (because I needed to convert from list to tuple), and I didn't find an elegant solution for that.
我返回的数据类型是String
、Integer
、Double
、DateTime
,在不同的订购.
The data types that I'm returning are String
, Integer
, Double
, DateTime
, in different order.
我尝试在 DataFrame 上使用 map
,但我的架构有问题.
I've tried to use map
on DataFrame, but having issues with my schema.
import spark.implicits._
def processData(row_type: String) = {
/*
completely random output here. Tuple/List/Array of
elements with a type Integer, String, Double, DateType.
*/
// pseudo-code starts here
if row_type == A
(1, "second", 3)
else
(1, "second", 3, 4)
}
val processDataUDF = udf((row_type: String) => processData(row_type))
val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()
结果
+------------+
| UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+
我该如何解决这个问题?我们对每个 row_type
有不同的处理结果.所有 row_type
都是动态设置的.我可以为每个 row_type
提供出色的 Schema
,但是我无法使用不同的模式生成相同的 UDF 返回结果.
How how should I approach this problem? We have different processing results per row_type
. All the row_type
's are set dynamically. I can great Schema
for each row_type
, but I cannot make same UDF return results with different schemas.
使用 map
是这里唯一的方法吗?
Is using map
is the only approach here ?
推荐答案
Spark Dataset
是一种柱状数据结构,这里真的没有灵活模式的地方.Schema 必须是同构的(所有行都必须具有相同的一般结构)并且预先知道(如果您使用 UDF,它必须返回明确定义的 SQL 类型).
Spark Dataset
is a columnar data structure and there is really no place for a flexible schema here. Schema has to be homogeneous (all rows have to have the same general structure) and known upfront (if you use UDF it has to return well defined SQL type).
您可以通过以下方式获得一定的灵活性:
You can achieve some flexibility by:
- 定义表示所有可能字段的超集的架构,并将各个列标记为
nullable
.这只有在没有类型冲突的情况下才有可能(如果Row
包含字段foo
,它总是使用相同的 SQL 类型表示). - 使用集合类型(
MapType
、ArrayType
)来表示大小可变的字段.所有值和/或键的类型必须相同. - 将原始数据重塑到可以使用固定架构实际表示的程度.Spark 包含作为其依赖项的
json4s
,它提供了一组工具对于 合并、差异化 和 查询 JSON 数据.如果需要,它可用于应用相对复杂的转换.
- Defining schema which represents a superset of all possible fields and mark individual columns as
nullable
. This is possible only if there are no type conflicts (ifRow
contains fieldfoo
it is always represented using the same SQL type). - Using collection types (
MapType
,ArrayType
) to represent fields with variable size. All values and / or keys have to be of the same type. - Reshaping raw data to the point where it is actually representable with fixed schema. Spark includes, as its dependency,
json4s
, which provides a set of tools for merging, diffing and querying JSON data. It can be used to apply relatively complex transformations if needed.
如果这不切实际,我会建议保持 JSON 字段原样"并仅按需解析它以提取特定值.您可以使用 get_json_object
和显式类型转换.这允许测试不同的场景:
If this is not practical I would recommend keeping JSON field "as is" and parsing it only on-demand to extract specific values. You can use get_json_object
and explicit type casting. This allows for testing different scenarios:
coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
.map(get_json_object($"json_col", _)): _*).cast(DoubleType)
不假设单个文档结构.
您可以使用二进制 Encoders
(Encoders.kryo
、Encoders.java
)或 RDD
获得更大的灵活性code> API,可用于存储联合类型(甚至 Any
),但如果您真的期望完全随机输出,则表明存在严重的设计或数据建模问题.即使您可以存储解析的数据,也很难使用它.
You can get a bit more flexibility with binary Encoders
(Encoders.kryo
, Encoders.java
) or RDD
API, which can be used to store union types (or even Any
), but if you really expect completely random output, it suggests some serious design or data modeling problem. Even if you can store parsed data it will be really hard to work with it.
这篇关于返回动态数据类型的 Apache Spark UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!