在 pyspark 中使用 Scala 类作为 UDF [英] Using Scala classes as UDF with pyspark

查看:28
本文介绍了在 pyspark 中使用 Scala 类作为 UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在使用 Apache Spark 时将一些计算从 Python 卸载到 Scala.我想使用 Java 的类接口来使用持久变量,就像这样(这是一个基于我更复杂用例的无意义的 MWE):

I'm trying to offload some computations from Python to Scala when using Apache Spark. I would like to use the class interface from Java to be able to use a persistent variable, like so (this is a nonsensical MWE based on my more complex use case):

package mwe

import org.apache.spark.sql.api.java.UDF1

class SomeFun extends UDF1[Int, Int] {
  private var prop: Int = 0

  override def call(input: Int): Int = {
    if (prop == 0) {
      prop = input
    }
    prop + input
  }
}

现在我正在尝试在 pyspark 中使用这个类:

Now I'm attempting to use this class from within pyspark:

import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext

conf = pyspark.SparkConf()
conf.set("spark.jars", "mwe.jar")
sc = SparkContext.getOrCreate(conf)

sqlContext = SQLContext.getOrCreate(sc)
sqlContext.registerJavaFunction("fun", "mwe.SomeFun")

df0 = sc.parallelize((i,) for i in range(6)).toDF(["num"])
df1 = df0.selectExpr("fun(num) + 3 as new_num")
df1.show()

并得到以下异常:

pyspark.sql.utils.AnalysisException: u"cannot resolve '(UDF:fun(num) + 3)' due to data type mismatch: differing types in '(UDF:fun(num) + 3)' (struct<> and int).; line 1 pos 0;\n'Project [(UDF:fun(num#0L) + 3) AS new_num#2]\n+- AnalysisBarrier\n      +- LogicalRDD [num#0L], false\n"

实现这一点的正确方法是什么?我是否必须使用 Java 本身来上课?我非常感谢提示!

What is the correct way to implement this? Will I have to resort to Java itself for the class? I'd greatly appreciate hints!

推荐答案

异常的根源是使用了不兼容的类型:

The source of the exception is usage of incompatible types:

  • 首先 oassql.api.java.UDF* 对象需要外部 Java(不是 Scala 类型),因此需要整数的 UDF 应该采用装箱的 Integer(java.lang.Integer) 不是 Int.

  • First of all o.a.s.sql.api.java.UDF* objects require external Java (not Scala types), so UDF expecting integers should take boxed Integer (java.lang.Integer) not Int.

class SomeFun extends UDF1[Integer, Integer] {
  ...
  override def call(input: Integer): Integer = {
    ...

  • 除非您使用旧的 Python num 列使用 LongType 而不是 IntegerType:

  • Unless you use legacy Python num column uses of LongType not IntegerType:

    df0.printSchema()
    root
     |-- num: long (nullable = true)
    

    所以实际签名应该是

    class SomeFun extends UDF1[java.lang.Long, java.lang.Long] {
      ...
      override def call(input: java.lang.Long): java.lang.Long = {
        ...
    

    或者数据应该在应用UDF之前进行转换

    or data should be casted before applying UDF

    df0.selectExpr("fun(cast(num as integer)) + 3 as new_num")
    

  • 最后,UDF 中不允许可变状态.它不会导致异常,但整体行为将是不确定的.

    Finally mutable state is not allowed in UDFs. It won't cause an exception but overall behavior will be non-deterministic.

    这篇关于在 pyspark 中使用 Scala 类作为 UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆