从Scala将UDF注册到SqlContext以在PySpark中使用 [英] Register UDF to SqlContext from Scala to use in PySpark
问题描述
是否可以注册用Scala编写的UDF(或函数)以在PySpark中使用? 例如:
Is it possible to register a UDF (or function) written in Scala to use in PySpark ? E.g.:
val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
在Scala中,现在可以进行以下操作:
In Scala, the following is now possible:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
我想像在PySpark中使用"UDFaddOne"
I would like to use "UDFaddOne" in PySpark like
%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
背景:我们是一个开发人员团队,一些使用Scala编码,一些使用Python,并且希望共享已经编写的函数.也可以将其保存到库中并导入.
Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.
推荐答案
As far as I know PySpark doesn't provide any equivalent of the callUDF
function and because of that it is not possible to access registered UDF directly.
这里最简单的解决方案是使用原始SQL表达式:
The simplest solution here is to use raw SQL expression:
mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))
## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")
## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")
这种方法相当有限,因此,如果您需要支持更复杂的工作流程,则应构建一个程序包并提供完整的Python包装器.在我对 Spark:如何使用Scala或Java用户定义函数映射Python的答案中,您会找到并举例说明UDAF包装器.
This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?
这篇关于从Scala将UDF注册到SqlContext以在PySpark中使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!