返回动态数据类型的Apache Spark UDF [英] Apache Spark UDF that returns dynamic data types

查看:96
本文介绍了返回动态数据类型的Apache Spark UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有UDF,它处理JSON并每行返回动态数据结果.就我而言,我需要使用它来验证数据并返回经过验证的数据.

I have UDF that processes JSON and returns dynamic data results per row. In my case I need this to validate data and return validated data.

该模式对于每行都是灵活的.这意味着我不能为每种情况创建case class(我的一些数据可以嵌套).

The schema is flexible for each row. This means I cannot create case class for every case (some of my data can be nested).

我试图从UDF函数返回元组,但是我也没有运气(因为我需要从列表转换为元组),而且我也没有找到一个优雅的解决方案.

I've tried to return tuple from my UDF function, but I had no luck in this either (because I needed to convert from list to tuple), and I didn't find an elegant solution for that.

我要返回的数据类型为StringIntegerDoubleDateTime,其顺序不同.

The data types that I'm returning are String, Integer, Double, DateTime, in different order.

我尝试在DataFrame上使用map,但是我的架构有问题.

I've tried to use map on DataFrame, but having issues with my schema.

import spark.implicits._

def processData(row_type: String) = {
  /*
  completely random output here. Tuple/List/Array of 
  elements with a type Integer, String, Double, DateType.
  */

  // pseudo-code starts here

  if row_type == A
     (1, "second", 3)
  else
     (1, "second", 3, 4)
}

val processDataUDF = udf((row_type: String) => processData(row_type))

val df = Seq((0, 1), (1, 2)).toDF("a", "b")
val df2 = df.select(processDataUDF($"a"))
df2.show(5)
df2.printSchema()

结果

+------------+
|      UDF(a)|
+------------+
|[1,second,3]|
|[1,second,3]|
+------------+

我应该如何处理这个问题?每个row_type我们有不同的处理结果.所有row_type都是动态设置的.对于每个row_type,我都可以使用Schema,但是我无法使用不同的模式生成相同的UDF返回结果.

How how should I approach this problem? We have different processing results per row_type. All the row_type's are set dynamically. I can great Schema for each row_type, but I cannot make same UDF return results with different schemas.

使用map是这里唯一的方法吗?

Is using map is the only approach here ?

推荐答案

Spark Dataset是一种列式数据结构,在这里实际上没有放置灵活模式的地方.模式必须是同构的(所有行必须具有相同的通用结构),并且必须预先知道(如果使用UDF,则它必须返回定义良好的SQL类型).

Spark Dataset is a columnar data structure and there is really no place for a flexible schema here. Schema has to be homogeneous (all rows have to have the same general structure) and known upfront (if you use UDF it has to return well defined SQL type).

您可以通过以下方式实现一些灵活性:

You can achieve some flexibility by:

  • 定义表示所有可能字段的超集的架构,并将各个列标记为nullable.仅在没有类型冲突的情况下才有可能(如果Row包含字段foo,则始终使用相同的SQL类型表示该字段).
  • 使用集合类型(MapTypeArrayType)表示具有可变大小的字段.所有值和/或键必须具有相同的类型.
  • 将原始数据重塑到实际可以使用固定模式表示的程度. Spark包括 json4s 作为其依赖项,它为合并,比较
  • Defining schema which represents a superset of all possible fields and mark individual columns as nullable. This is possible only if there are no type conflicts (if Row contains field foo it is always represented using the same SQL type).
  • Using collection types (MapType, ArrayType) to represent fields with variable size. All values and / or keys have to be of the same type.
  • Reshaping raw data to the point where it is actually representable with fixed schema. Spark includes, as its dependency, json4s, which provides a set of tools for merging, diffing and querying JSON data. It can be used to apply relatively complex transformations if needed.

如果这不切实际,我建议保持原样" JSON字段,并仅按需对其进行解析以提取特定值.您可以使用get_json_object和显式类型转换.这样可以测试不同的场景:

If this is not practical I would recommend keeping JSON field "as is" and parsing it only on-demand to extract specific values. You can use get_json_object and explicit type casting. This allows for testing different scenarios:

coalesce(Seq("$.bar", "$.foo.bar", "$.foobar.foo.bar")
  .map(get_json_object($"json_col", _)): _*).cast(DoubleType)

不采用单一文档结构.

使用二进制Encoders(Encoders.kryoEncoders.java)或RDD API可以获得更大的灵活性,这些API可用于存储联合类型(甚至是Any),但是如果您确实如此期望完全随机输出,这表明存在一些严重的设计或数据建模问题.即使您可以存储已解析的数据,也将很难使用它.

You can get a bit more flexibility with binary Encoders (Encoders.kryo, Encoders.java) or RDD API, which can be used to store union types (or even Any), but if you really expect completely random output, it suggests some serious design or data modeling problem. Even if you can store parsed data it will be really hard to work with it.

这篇关于返回动态数据类型的Apache Spark UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆