将 UDF 从 Scala 注册到 SqlContext 以在 PySpark 中使用 [英] Register UDF to SqlContext from Scala to use in PySpark
问题描述
是否可以注册用 Scala 编写的 UDF(或函数)以在 PySpark 中使用?例如:
Is it possible to register a UDF (or function) written in Scala to use in PySpark ? E.g.:
val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2
在 Scala 中,现在可以执行以下操作:
In Scala, the following is now possible:
val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3
我想在 PySpark 中使用UDFaddOne"
I would like to use "UDFaddOne" in PySpark like
%pyspark
mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work
背景:我们是一个开发团队,一些使用 Scala 编码,一些使用 Python,并且想分享已经编写的函数.也可以将其保存到库中并导入.
Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.
推荐答案
据我所知,PySpark 不提供任何等效的 callUDF
函数,因此无法直接访问注册的 UDF.
As far as I know PySpark doesn't provide any equivalent of the callUDF
function and because of that it is not possible to access registered UDF directly.
这里最简单的解决方案是使用原始 SQL 表达式:
The simplest solution here is to use raw SQL expression:
mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))
## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")
## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")
这种方法相当有限,因此如果您需要支持更复杂的工作流,您应该构建一个包并提供完整的 Python 包装器.您会在我对 Spark: How to map Python with Scala or Java User Defined Functions? 的回答中找到 UDAF 包装器和示例
This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?
这篇关于将 UDF 从 Scala 注册到 SqlContext 以在 PySpark 中使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!