使用pyspark将Scala类用作UDF [英] Using Scala classes as UDF with pyspark
问题描述
使用Apache Spark时,我正在尝试将一些计算从Python转移到Scala.我想使用Java的类接口来使用持久变量,就像这样(这是基于我更复杂的用例的荒谬的MWE):
I'm trying to offload some computations from Python to Scala when using Apache Spark. I would like to use the class interface from Java to be able to use a persistent variable, like so (this is a nonsensical MWE based on my more complex use case):
package mwe
import org.apache.spark.sql.api.java.UDF1
class SomeFun extends UDF1[Int, Int] {
private var prop: Int = 0
override def call(input: Int): Int = {
if (prop == 0) {
prop = input
}
prop + input
}
}
现在我正尝试在pyspark内部使用此类:
Now I'm attempting to use this class from within pyspark:
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext
conf = pyspark.SparkConf()
conf.set("spark.jars", "mwe.jar")
sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext.getOrCreate(sc)
sqlContext.registerJavaFunction("fun", "mwe.SomeFun")
df0 = sc.parallelize((i,) for i in range(6)).toDF(["num"])
df1 = df0.selectExpr("fun(num) + 3 as new_num")
df1.show()
并获得以下异常:
pyspark.sql.utils.AnalysisException: u"cannot resolve '(UDF:fun(num) + 3)' due to data type mismatch: differing types in '(UDF:fun(num) + 3)' (struct<> and int).; line 1 pos 0;\n'Project [(UDF:fun(num#0L) + 3) AS new_num#2]\n+- AnalysisBarrier\n +- LogicalRDD [num#0L], false\n"
实现此目的的正确方法是什么?我上课需要依靠Java本身吗?我非常感谢提示!
What is the correct way to implement this? Will I have to resort to Java itself for the class? I'd greatly appreciate hints!
推荐答案
异常的来源是使用不兼容的类型:
The source of the exception is usage of incompatible types:
-
首先,
o.a.s.sql.api.java.UDF*
对象需要外部Java(不是Scala类型),因此UDF期望整数应采用带框的Integer
(java.lang.Integer
)而不是Int
.
First of all
o.a.s.sql.api.java.UDF*
objects require external Java (not Scala types), so UDF expecting integers should take boxedInteger
(java.lang.Integer
) notInt
.
class SomeFun extends UDF1[Integer, Integer] {
...
override def call(input: Integer): Integer = {
...
除非您使用LongType
而不是IntegerType
的旧版Python num
列使用:
Unless you use legacy Python num
column uses of LongType
not IntegerType
:
df0.printSchema()
root
|-- num: long (nullable = true)
所以实际签名应该是
class SomeFun extends UDF1[java.lang.Long, java.lang.Long] {
...
override def call(input: java.lang.Long): java.lang.Long = {
...
或者应该在应用UDF之前强制转换数据
or data should be casted before applying UDF
df0.selectExpr("fun(cast(num as integer)) + 3 as new_num")
在UDF中不允许最终可变的状态.它不会引起异常,但是总体行为将是不确定的.
Finally mutable state is not allowed in UDFs. It won't cause an exception but overall behavior will be non-deterministic.
这篇关于使用pyspark将Scala类用作UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!