如何在 Java/Kotlin 中创建一个返回复杂类型的 Spark UDF? [英] How to create a Spark UDF in Java / Kotlin which returns a complex type?

查看:24
本文介绍了如何在 Java/Kotlin 中创建一个返回复杂类型的 Spark UDF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个返回复杂类型的 UDF:

I'm trying to write an UDF which returns a complex type:

private val toPrice = UDF1<String, Map<String, String>> { s ->
    val elements = s.split(" ")
    mapOf("value" to elements[0], "currency" to elements[1])
}


val type = DataTypes.createStructType(listOf(
        DataTypes.createStructField("value", DataTypes.StringType, false),
        DataTypes.createStructField("currency", DataTypes.StringType, false)))
df.sqlContext().udf().register("toPrice", toPrice, type)

但任何时候我使用它:

df = df.withColumn("price", callUDF("toPrice", col("price")))

我收到一个神秘错误:

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$28: (string) => struct<value:string,currency:string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: {value=138.0, currency=USD} (of class java.util.LinkedHashMap)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:236)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:231)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:379)
    ... 19 more

我尝试使用自定义数据类型:

I tried to use a custom data type:

class Price(val value: Double, val currency: String) : Serializable

带有返回该类型的 UDF:

with an UDF which returns that type:

private val toPrice = UDF1<String, Price> { s ->
    val elements = s.split(" ")
    Price(elements[0].toDouble(), elements[1])
}

但后来我得到另一个 MatchError 抱怨 Price 类型.

but then I get another MatchError which complains for the Price type.

如何正确编写可以返回复杂类型的 UDF?

推荐答案

TL;DR 函数应该返回一个 org.apache.spark.sql.Row.

TL;DR The function should return an object of class org.apache.spark.sql.Row.

Spark 提供了 UDF 定义的两种主要变体.

Spark provides two main variants of UDF definitions.

  1. udf 使用 Scala 反射的变体:

  1. udf variants using Scala reflection:

  • def udf[RT](f: () ⇒ RT)(隐式 arg0: TypeTag[RT]): UserDefinedFunction
  • def udf[RT, A1](f: (A1) ⇒ RT)(隐式 arg0: TypeTag[RT], arg1: TypeTag[A1]): UserDefinedFunction
  • ...
  • def udf[RT, A1, A2, ..., A10](f: (A1, A2, ..., A10) ⇒ RT)(隐式 arg0: TypeTag[RT], arg1: TypeTag[A1], arg2: TypeTag[A2], ..., arg10: TypeTag[A10])

定义

作为用户定义函数 (UDF) 的 ... 参数的 Scala 闭包.数据类型是根据 Scala 闭包的签名自动推断的.

Scala closure of ... arguments as user-defined function (UDF). The data types are automatically inferred based on the Scala closure's signature.

这些变体在没有模式的情况下使用原子或代数数据类型.例如,有问题的函数将在 Scala 中定义:

These variants are used without schema with atomics or algebraic data types. For example the function in question would be defined in Scala:

case class Price(value: Double, currency: String) 

val df = Seq("1 USD").toDF("price")

val toPrice = udf((s: String) => scala.util.Try { 
  s split(" ") match {
    case Array(price, currency) => Price(price.toDouble, currency)
  }
}.toOption)

df.select(toPrice($"price")).show
// +----------+
// |UDF(price)|
// +----------+
// |[1.0, USD]|
// +----------+

在这个变体中,返回类型是自动编码的.

In this variant return type is automatically encoded.

由于它依赖于反射,这个变体主要面向 Scala 用户.

Due to it's dependence on reflection this variant is intended primarily for Scala users.

udf 提供模式定义的变体(您在此处使用的一个).此变体的返回类型应与 Dataset[Row] 相同:

udf variants providing schema definition (one you use here). The return type for this variant, should be the same as for Dataset[Row]:

  • 正如其他答案中所指出的,您只能使用 SQL 类型映射表(已装箱或未装箱的原子类型,java.sql.Timestamp/java.sql.Date,以及高级集合).

  • As pointed out in the other answer you can use only the types listed in the SQL types mapping table (atomic types either boxed or unboxed, java.sql.Timestamp / java.sql.Date, as well as high level collections).

复杂结构 (structs/StructTypes) 使用 org.apache.spark.sql.Row 表示.不允许与代数数据类型或等效数据混合.例如(Scala 代码)

Complex structures (structs / StructTypes) are expressed using org.apache.spark.sql.Row. No mixing with algebraic data types or equivalent is allowed. For example (Scala code)

struct<_1:int,_2:struct<_1:string,_2:struct<_1:double,_2:int>>>

应表示为

Row(1, Row("foo", Row(-1.0, 42))))

不是

(1, ("foo", (-1.0, 42))))

或任何混合变体,如

Row(1, Row("foo", (-1.0, 42))))

提供此变体主要是为了确保 Java 互操作性.

This variant is provided primarily to ensure Java interoperability.

在这种情况下(相当于所讨论的那个),定义应该与以下类似:

In this case (equivalent to the one in question) the definition should be similar to the following one:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row


val schema = StructType(Seq(
  StructField("value", DoubleType, false),
  StructField("currency", StringType, false)
))

val toPrice = udf((s: String) => scala.util.Try { 
  s split(" ") match {
    case Array(price, currency) => Row(price.toDouble, currency)
  }
}.getOrElse(null), schema)

df.select(toPrice($"price")).show
// +----------+
// |UDF(price)|
// +----------+
// |[1.0, USD]|
// |      null|
// +----------+

排除异常处理的所有细微差别(通常UDFs 应该控制null 输入并按照惯例优雅地处理格式错误的数据)Java 等价物应该或多或少像这个:

Excluding all the nuances of exception handling (in general UDFs should contr ol for null input and by convention gracefully handle malformed data) Java equivalent should look more or less like this:

UserDefinedFunction price = udf((String s) -> {
    String[] split = s.split(" ");
    return RowFactory.create(Double.parseDouble(split[0]), split[1]);
}, DataTypes.createStructType(new StructField[]{
    DataTypes.createStructField("value", DataTypes.DoubleType, true),
    DataTypes.createStructField("currency", DataTypes.StringType, true)
}));

上下文:

为了给您一些上下文,这种区别也反映在 API 的其他部分中.例如,您可以从架构和一系列 Rows 中创建 DataFrame:

To give you some context this distinction is reflected in the other parts of the API as well. For example, you can create DataFrame from a schema and a sequence of Rows:

def createDataFrame(rows: List[Row], schema: StructType): DataFrame 

或对一系列 Products

def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: TypeTag[A]): DataFrame 

但不支持混合变体.

换句话说,您应该提供可以使用 RowEncoder 进行编码的输入.

In other words you should provide input that can be encoded using RowEncoder.

当然,您通常不会将 udf 用于这样的任务:

Of course you wouldn't normally use udf for the task like this one:

import org.apache.spark.sql.functions._

df.withColumn("price", struct(
  split($"price", " ")(0).cast("double").alias("price"),
  split($"price", " ")(1).alias("currency")
))

相关:

这篇关于如何在 Java/Kotlin 中创建一个返回复杂类型的 Spark UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆