将 UDF 从 Scala 注册到 SqlContext 以在 PySpark 中使用 [英] Register UDF to SqlContext from Scala to use in PySpark

查看:22
本文介绍了将 UDF 从 Scala 注册到 SqlContext 以在 PySpark 中使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以注册用 Scala 编写的 UDF(或函数)以在 PySpark 中使用?例如:

Is it possible to register a UDF (or function) written in Scala to use in PySpark ? E.g.:

val mytable = sc.parallelize(1 to 2).toDF("spam")
mytable.registerTempTable("mytable")
def addOne(m: Integer): Integer = m + 1
// Spam: 1, 2

在 Scala 中,现在可以执行以下操作:

In Scala, the following is now possible:

val UDFaddOne = sqlContext.udf.register("UDFaddOne", addOne _)
val mybiggertable = mytable.withColumn("moreSpam", UDFaddOne(mytable("spam")))
// Spam: 1, 2
// moreSpam: 2, 3

我想在 PySpark 中使用UDFaddOne"

I would like to use "UDFaddOne" in PySpark like

%pyspark

mytable = sqlContext.table("mytable")
UDFaddOne = sqlContext.udf("UDFaddOne") # does not work
mybiggertable = mytable.withColumn("+1", UDFaddOne(mytable("spam"))) # does not work

背景:我们是一个开发团队,一些使用 Scala 编码,一些使用 Python,并且想分享已经编写的函数.也可以将其保存到库中并导入.

Background: We are a team of developpers, some coding in Scala and some in Python, and would like to share already written functions. It would also be possible to save it into a library and import it.

推荐答案

据我所知,PySpark 不提供任何等效的 callUDF 函数,因此无法直接访问注册的 UDF.

As far as I know PySpark doesn't provide any equivalent of the callUDF function and because of that it is not possible to access registered UDF directly.

这里最简单的解决方案是使用原始 SQL 表达式:

The simplest solution here is to use raw SQL expression:

mytable.withColumn("moreSpam", expr("UDFaddOne({})".format("spam")))

## OR
sqlContext.sql("SELECT *, UDFaddOne(spam) AS moreSpam FROM mytable")

## OR
mytable.selectExpr("*", "UDFaddOne(spam) AS moreSpam")

这种方法相当有限,因此如果您需要支持更复杂的工作流,您应该构建一个包并提供完整的 Python 包装器.您会在我对 Spark: How to map Python with Scala or Java User Defined Functions? 的回答中找到 UDAF 包装器和示例

This approach is rather limited so if you need to support more complex workflows you should build a package and provide complete Python wrappers. You'll find and example UDAF wrapper in my answer to Spark: How to map Python with Scala or Java User Defined Functions?

这篇关于将 UDF 从 Scala 注册到 SqlContext 以在 PySpark 中使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆