如何在PySpark中使用Scala UDF? [英] How to use Scala UDF in PySpark?
问题描述
我希望能够将Scala函数用作PySpark中的UDF
I want to be able to use a Scala function as a UDF in PySpark
package com.test
object ScalaPySparkUDFs extends Serializable {
def testFunction1(x: Int): Int = { x * 2 }
def testUDFFunction1 = udf { x: Int => testFunction1(x) }
}
我可以在PySpark中访问testFunction1
并使其返回值:
I can access testFunction1
in PySpark and have it return values:
functions = sc._jvm.com.test.ScalaPySparkUDFs
functions.testFunction1(10)
我想做的就是将此函数用作UDF,最好是在withColumn
调用中使用:
What I want to be able to do is use this function as a UDF, ideally in a withColumn
call:
row = Row("Value")
numbers = sc.parallelize([1,2,3,4]).map(row).toDF()
numbers.withColumn("Result", testUDFFunction1(numbers['Value']))
我认为这里有一种很有前途的方法: Spark:如何使用Scala映射Python还是Java用户定义函数?
I think a promising approach is as found here: Spark: How to map Python with Scala or Java User Defined Functions?
但是,当对其中找到的代码进行更改时,可以使用testUDFFunction1
代替:
However, when making the changes to code found there to use testUDFFunction1
instead:
def udf_test(col):
sc = SparkContext._active_spark_context
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1.apply
return Column(_f(_to_seq(sc, [col], _to_java_column)))
我得到:
AttributeError: 'JavaMember' object has no attribute 'apply'
我不明白这一点,因为我相信testUDFFunction1
确实有套用方法?
I don't understand this because I believe testUDFFunction1
does have an apply method?
我不想使用此处找到的类型的表达式: 从Scala将UDF注册到SqlContext以便在PySpark中使用
I do not want to use expressions of the type found here: Register UDF to SqlContext from Scala to use in PySpark
任何有关如何进行这项工作的建议,将不胜感激!
Any suggestions as to how to make this work would be appreciated!
推荐答案
您链接的问题是使用Scala object
. Scala object
是单例,您可以直接使用apply
方法.
The question you've linked is using a Scala object
. Scala object
is a singleton and you can use apply
method directly.
在这里,您使用一个空函数返回一个UserDefinedFunction
类对象,您必须首先调用该函数:
Here you use a nullary function which returns an object of UserDefinedFunction
class co you have to call the function first:
_f = sc._jvm.com.test.ScalaPySparkUDFs.testUDFFunction1() # Note () at the end
Column(_f.apply(_to_seq(sc, [col], _to_java_column)))
这篇关于如何在PySpark中使用Scala UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!