Spark:如何使用 Scala 或 Java 用户定义函数映射 Python? [英] Spark: How to map Python with Scala or Java User Defined Functions?
问题描述
例如,假设我的团队选择 Python 作为使用 Spark 进行开发的参考语言.但后来出于性能原因,我们希望开发特定的 Scala 或 Java 特定的库,以便将它们与我们的 Python 代码(类似于带有 Scala 或 Java 框架的 Python 存根)进行映射.
Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).
您不认为可以将新的自定义 Python 方法与一些 Scala 或 Java 用户定义函数进行交互吗?
Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?
推荐答案
Spark 2.1+
您可以使用 <代码>SQLContext.registerJavaFunction:
You can use SQLContext.registerJavaFunction
:
注册一个java UDF,以便在SQL语句中使用.
Register a java UDF so it can be used in SQL statements.
需要 name
、Java 类的完全限定名称和可选的返回类型.不幸的是,目前它只能在 SQL 语句中使用(或与 expr
/selectExpr
一起使用)并且需要 Java org.apache.spark.sql.api.java.UDF*
:
which requires a name
, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr
/ selectExpr
) and requires a Java org.apache.spark.sql.api.java.UDF*
:
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.1.0"
)
package com.example.spark.udfs
import org.apache.spark.sql.api.java.UDF1
class addOne extends UDF1[Integer, Integer] {
def call(x: Integer) = x + 1
}
sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()
## +------+
## |UDF(1)|
## +------+
## | 2|
## +------+
独立于版本:
我不会说它受支持,但它肯定是可能的.PySpark 中当前可用的所有 SQL 函数都只是 Scala API 的包装器.
I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.
假设我想重用我创建的 GroupConcat
UDAF 作为对 SPARK SQL 替换的回答mysql GROUP_CONCAT 聚合函数,位于com.example.udaf
包中:
Lets assume I want to reuse GroupConcat
UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf
:
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row
row = Row("k", "v")
df = sc.parallelize([
row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()
def groupConcat(col):
"""Group and concatenate values for a given column
>>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
>>> df.select(groupConcat("v").alias("vs"))
[Row(vs=u'foo,bar')]
"""
sc = SparkContext._active_spark_context
# It is possible to use java_import to avoid full package path
_groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
# Converting to Seq to match apply(exprs: Column*)
return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))
df.groupBy("k").agg(groupConcat("v").alias("vs")).show()
## +---+---------+
## | k| vs|
## +---+---------+
## | 1|foo1,foo2|
## | 2|bar1,bar2|
## +---+---------+
前导下划线太多了,但正如您所见,这是可以做到的.
There is far too much leading underscores for my taste but as you can see it can be done.
相关:
这篇关于Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!