在 SQLContext 之外用 Java 创建 SparkSQL UDF [英] Creating a SparkSQL UDF in Java outside of SQLContext

查看:26
本文介绍了在 SQLContext 之外用 Java 创建 SparkSQL UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

我想在 Java 中创建一个用户定义的函数,它可以在 Apache Spark 运算符链中作为 Java 方法调用.我无法找到不需要 UDF 存在于 SQL 查询中的 Java 示例.

I would like to create a User-Defined Function in Java that can be called as a Java method within a chain of Apache Spark operators. I'm having trouble finding Java examples that don't require the UDF to exist inside of a SQL query.

版本

  • Java 8
  • Scala 2.10.6
  • Apache Spark 1.6.0 为 Hadoop 2.6.0 预构建

我尝试过的方法

我可以在 Java 中成功创建一个 UDF.但是,除非它在 ​​SQL 查询中,否则我不能使用它:

I can successfully create a UDF in Java. However, I can't use this unless it's in a SQL query:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");

我被困的地方

我希望 Java 中的非 SQL 方法调用风格的 UDF 看起来像这样:

I would expect a non-SQL method-call-style UDF in Java to look something like this:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));

编译这会导致以UserDefinedFunction"开头的行出现编译器错误,因此显然我猜测正确签名的尝试是不正确的:

Compiling this leads to a compiler error on the line beginning with "UserDefinedFunction", so obviously my attempt at guessing the right signature is incorrect:

error: no suitable method found for udf((String st[...]ase(),DataType)
    UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
    (cannot infer type-variable(s) RT#1
    (argument mismatch; Function0 is not a functional interface
    multiple non-overriding abstract methods found in interface Function0))

此错误继续包含尝试的每个推断 udf() 签名的详细信息.

This error continues with detail for each of the inferred udf() signatures attempted.

我需要什么

我需要修复 Java 代码,以便我可以定义和使用 udfUppercase UDF,而无需将其嵌入 SQL 查询中.我觉得我遗漏了一些非常简单、基本并且可能是语法上的东西,但可能完全偏离了基础.

I need to fix the Java code so that I can define and use the udfUppercase UDF without embedding it in a SQL query. I feel like I'm missing something very simple, fundamental, and possibly syntax-y, but could be completely off base.

工作解决方案(由下面的 zero323 提供)

没有好的方法可以将 Java UDF 注册和用作 Java 方法,但是可以使用 callUDF() 将在 SQLContext 中注册的 UDF 插入到运算符链中.

There's no good way to register and use a Java UDF as a Java method, but a UDF registered in the SQLContext can be inserted into a chain of operators using callUDF().

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));

另外,请务必使用 callUDF() 而不是具有不同方法签名的已弃用的 callUdf().

Also, be sure to use callUDF() and not the deprecated callUdf() which has a different method signature.

推荐答案

Spark >= 2.3

SPARK-22945(在函数中添加 java UDF APIobject) 添加了简化的 udf API,类似于 Scala 和 Python:

SPARK-22945 (add java UDF APIs in the functions object) adds simplified udf API, similar to Scala and Python:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction udfUppercase = udf(
  (String s) -> s.toUpperCase(), DataTypes.StringType
);

df.select(udfUppercase.apply(col("name")));

火花<2.3

长话短说 functions.udf 方法不是为 Java 互操作性而设计的.所有变体都需要 TypeTags,虽然可以手动生成这些(我很确定我已经看过 Daniel Darabos 展示了如何在 SO 上做到这一点)这是您可能想要避免的事情.

Long story short functions.udf methods are not designed for Java interoperability. All variants require TypeTags and while it is possible to generate these manually (I am pretty sure I've seen Daniel Darabos showing how to do it on SO) it is something you probably want to avoid.

如果由于某种原因您想避免在 Scala 中编写 UDF,最简单的方法是注册 UDF 并按名称调用:

If for some reason you want to avoid writing UDF in Scala the simplest thing is to register UDF and call it by name:

sqlContext.udf().register("udfUppercase",
  (String string) -> string.toUpperCase(), DataTypes.StringType);

df.select(callUDF("udfUppercase", col("name")));

这篇关于在 SQLContext 之外用 Java 创建 SparkSQL UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆