Spark:如何使用 Scala 或 Java 用户定义函数映射 Python? [英] Spark: How to map Python with Scala or Java User Defined Functions?

查看:22
本文介绍了Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如,假设我的团队选择 Python 作为使用 Spark 进行开发的参考语言.但后来出于性能原因,我们希望开发特定的 Scala 或 Java 特定的库,以便将它们与我们的 Python 代码(类似于带有 Scala 或 Java 框架的 Python 存根)进行映射.

Let's say for instance that my team has choosen Python as the reference language to develop with Spark. But later for performance reasons, we would like to develop specific Scala or Java specific librairies in order to map them with our Python code (something similar to Python stubs with Scala or Java skeletons).

您不认为可以将新的自定义 Python 方法与一些 Scala 或 Java 用户定义函数进行交互吗?

Don't you think is it possible to interface new customized Python methods with under the hood some Scala or Java User Defined Functions ?

推荐答案

Spark 2.1+

您可以使用 <代码>SQLContext.registerJavaFunction:

You can use SQLContext.registerJavaFunction:

注册一个java UDF,以便在SQL语句中使用.

Register a java UDF so it can be used in SQL statements.

需要 name、Java 类的完全限定名称和可选的返回类型.不幸的是,目前它只能在 SQL 语句中使用(或与 expr/selectExpr 一起使用)并且需要 Java org.apache.spark.sql.api.java.UDF*:

which requires a name, fully qualified name of Java class, and optional return type. Unfortunately for now it can be used only in SQL statements (or with expr / selectExpr) and requires a Java org.apache.spark.sql.api.java.UDF*:

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.1.0"
)

package com.example.spark.udfs

import org.apache.spark.sql.api.java.UDF1

class addOne extends UDF1[Integer, Integer] {
  def call(x: Integer) = x + 1
} 

sqlContext.registerJavaFunction("add_one", "com.example.spark.udfs.addOne")
sqlContext.sql("SELECT add_one(1)").show()

## +------+
## |UDF(1)|
## +------+
## |     2|
## +------+

独立于版本:

我不会说它受支持,但它肯定是可能的.PySpark 中当前可用的所有 SQL 函数都只是 Scala API 的包装器.

I wouldn't go so far as to say it is supported but it is certainly possible. All SQL functions available currently in PySpark are simply a wrappers around Scala API.

假设我想重用我创建的 GroupConcat UDAF 作为对 SPARK SQL 替换的回答mysql GROUP_CONCAT 聚合函数,位于com.example.udaf 包中:

Lets assume I want to reuse GroupConcat UDAF I've created as an answer to SPARK SQL replacement for mysql GROUP_CONCAT aggregate function and it is located in a package com.example.udaf:

from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql import Row

row = Row("k", "v")
df = sc.parallelize([
    row(1, "foo1"), row(1, "foo2"), row(2, "bar1"), row(2, "bar2")]).toDF()

def groupConcat(col):
    """Group and concatenate values for a given column

    >>> df = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
    >>> df.select(groupConcat("v").alias("vs"))
    [Row(vs=u'foo,bar')]
    """
    sc = SparkContext._active_spark_context
    # It is possible to use java_import to avoid full package path
    _groupConcat = sc._jvm.com.example.udaf.GroupConcat.apply
    # Converting to Seq to match apply(exprs: Column*)
    return Column(_groupConcat(_to_seq(sc, [col], _to_java_column)))

df.groupBy("k").agg(groupConcat("v").alias("vs")).show()

## +---+---------+
## |  k|       vs|
## +---+---------+
## |  1|foo1,foo2|
## |  2|bar1,bar2|
## +---+---------+

前导下划线太多了,但正如您所见,这是可以做到的.

There is far too much leading underscores for my taste but as you can see it can be done.

相关:

这篇关于Spark:如何使用 Scala 或 Java 用户定义函数映射 Python?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆