返回动态数据类型的 Apache Spark UDF [英] Apache Spark UDF that returns dynamic data types

查看:29
本文介绍了返回动态数据类型的 Apache Spark UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 UDF 处理 JSON 并返回每行的动态数据结果.在我的情况下,我需要它来验证数据并返回经过验证的数据.

I have UDF that processes JSON and returns dynamic data results per row. In my case I need this to validate data and return validated data.

架构对于每一行都是灵活的.这意味着我无法为每个案例创建 case class(我的一些数据可以嵌套).

The schema is flexible for each row. This means I cannot create case class for every case (some of my data can be nested).

我试图从我的 UDF 函数返回元组,但我也没有运气(因为我需要从列表转换为元组),我没有找到一个优雅的解决方案.

I've tried to return tuple from my UDF function, but I had no luck in this either (because I needed to convert from list to tuple), and I didn't find an elegant solution for that.

我返回的数据类型是StringIntegerDoubleDateTime,在不同的订购.

The data types that I'm returning are String, Integer, Double, DateTime, in different order.

我尝试在 DataFrame 上使用 map,但我的架构有问题.

I've tried to use map on DataFrame, but having issues with my schema.

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer, String, Double, DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1, "second", 3)
  else
     (1, "second", 3, 4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

结果

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+

我该如何解决这个问题?我们对每个 row_type 有不同的处理结果.所有 row_type 都是动态设置的.我可以为每个 row_type 提供出色的 Schema,但是我无法使用不同的模式生成相同的 UDF 返回结果.

How how should I approach this problem? We have different processing results per row_type. All the row_type's are set dynamically. I can great Schema for each row_type, but I cannot make same UDF return results with different schemas.

使用 map 是这里唯一的方法吗?

Is using map is the only approach here ?

推荐答案

Spark Dataset 是一种柱状数据结构,这里真的没有灵活模式的地方.Schema 必须是同构的(所有行都必须具有相同的一般结构)并且预先知道(如果您使用 UDF,它必须返回明确定义的 SQL 类型).

Spark Dataset is a columnar data structure and there is really no place for a flexible schema here. Schema has to be homogeneous (all rows have to have the same general structure) and known upfront (if you use UDF it has to return well defined SQL type).

您可以通过以下方式获得一定的灵活性:

You can achieve some flexibility by:

  • 定义表示所有可能字段的超集的架构,并将各个列标记为 nullable.这只有在没有类型冲突的情况下才有可能(如果 Row 包含字段 foo,它总是使用相同的 SQL 类型表示).
  • 使用集合类型(MapTypeArrayType)来表示大小可变的字段.所有值和/或键的类型必须相同.
  • 将原始数据重塑到可以使用固定架构实际表示的程度.Spark 包含作为其依赖项的 json4s,它提供了一组工具对于 合并、差异化查询 JSON 数据.如果需要,它可用于应用相对复杂的转换.
  • Defining schema which represents a superset of all possible fields and mark individual columns as nullable. This is possible only if there are no type conflicts (if Row contains field foo it is always represented using the same SQL type).
  • Using collection types (MapType, ArrayType) to represent fields with variable size. All values and / or keys have to be of the same type.
  • Reshaping raw data to the point where it is actually representable with fixed schema. Spark includes, as its dependency, json4s, which provides a set of tools for merging, diffing and querying JSON data. It can be used to apply relatively complex transformations if needed.

如果这不切实际,我会建议保持 JSON 字段原样"并仅按需解析它以提取特定值.您可以使用 get_json_object 和显式类型转换.这允许测试不同的场景:

If this is not practical I would recommend keeping JSON field "as is" and parsing it only on-demand to extract specific values. You can use get_json_object and explicit type casting. This allows for testing different scenarios:

coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
  .map(get_json_object($"json_col", _)): _*).cast(DoubleType)

不假设单个文档结构.

您可以使用二进制 Encoders(Encoders.kryoEncoders.java)或 RDD 获得更大的灵活性code> API,可用于存储联合类型(甚至 Any),但如果您真的期望完全随机输出,则表明存在严重的设计或数据建模问题.即使您可以存储解析的数据,也很难使用它.

You can get a bit more flexibility with binary Encoders (Encoders.kryo, Encoders.java) or RDD API, which can be used to store union types (or even Any), but if you really expect completely random output, it suggests some serious design or data modeling problem. Even if you can store parsed data it will be really hard to work with it.

这篇关于返回动态数据类型的 Apache Spark UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆