在SQLContext之外的Java中创建SparkSQL UDF [英] Creating a SparkSQL UDF in Java outside of SQLContext

查看:358
本文介绍了在SQLContext之外的Java中创建SparkSQL UDF的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题

我想在Java中创建一个用户定义的函数,可以将其称为Apache Spark运算符链中的Java方法.我在查找不需要UDF在SQL查询中存在的Java示例时遇到了麻烦.

I would like to create a User-Defined Function in Java that can be called as a Java method within a chain of Apache Spark operators. I'm having trouble finding Java examples that don't require the UDF to exist inside of a SQL query.

版本

  • Java 8
  • Scala 2.10.6
  • 为Hadoop 2.6.0预先构建的Apache Spark 1.6.0

我尝试过的有效方法

我可以用Java成功创建UDF.但是,除非在SQL查询中,否则我将无法使用它:

I can successfully create a UDF in Java. However, I can't use this unless it's in a SQL query:

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
oldDF.registerTempTable("df");
DataFrame newDF = sqlContext.sql("SELECT udfUppercase(name) AS name_upper FROM df");

我被困在哪里

我希望Java中的非SQL方法调用样式的UDF看起来像这样:

I would expect a non-SQL method-call-style UDF in Java to look something like this:

import static org.apache.spark.sql.functions.udf;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

UserDefinedFunction udfUppercase = udf(
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", udfUppercase(oldDF.col("name")));

编译这会导致在以"UserDefinedFunction"开头的行上出现编译器错误,因此显然我在猜测正确的签名时的尝试是不正确的:

Compiling this leads to a compiler error on the line beginning with "UserDefinedFunction", so obviously my attempt at guessing the right signature is incorrect:

error: no suitable method found for udf((String st[...]ase(),DataType)
    UserDefinedFunction udfUppercase = udf((String string) -> string.toUpperCase(), DataTypes.StringType);
method functions.<RT#1>udf(Function0<RT#1>,TypeTags.TypeTag<RT#1>) is not applicable
    (cannot infer type-variable(s) RT#1
    (argument mismatch; Function0 is not a functional interface
    multiple non-overriding abstract methods found in interface Function0))

对于尝试的每个推断的udf()签名,此错误继续详细介绍.

This error continues with detail for each of the inferred udf() signatures attempted.

我需要的

我需要修复Java代码,以便可以定义和使用udfUppercase UDF,而无需将其嵌入SQL查询中.我觉得我缺少了一些非常简单,基本的语法(可能是语法y),但可能完全不合时宜.

I need to fix the Java code so that I can define and use the udfUppercase UDF without embedding it in a SQL query. I feel like I'm missing something very simple, fundamental, and possibly syntax-y, but could be completely off base.

工作解决方案(由下面的零323提供)

没有注册和使用Java UDF作为Java方法的好方法,但是可以使用callUDF()将在SQLContext中注册的UDF插入到运算符链中.

There's no good way to register and use a Java UDF as a Java method, but a UDF registered in the SQLContext can be inserted into a chain of operators using callUDF().

import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;

sqlContext.udf().register("udfUppercase",
    (String string) -> string.toUpperCase(), DataTypes.StringType);

DataFrame oldDF = // a simple DataFrame with a "name" column
newDF = oldDF.withColumn("name_upper", callUDF("udfUppercase", oldDF.col("name")));

此外,请确保使用callUDF(),而不要使用具有不同方法签名的已弃用的callUdf().

Also, be sure to use callUDF() and not the deprecated callUdf() which has a different method signature.

推荐答案

火花> = 2.3

SPARK-22945 (在函数中添加Java UDF API object )添加了简化的udf API,类似于Scala和Python:

SPARK-22945 (add java UDF APIs in the functions object) adds simplified udf API, similar to Scala and Python:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction udfUppercase = udf(
  (String s) -> s.toUpperCase(), DataTypes.StringType
);

df.select(udfUppercase.apply(col("name")));

火花< 2.3

长话短说的functions.udf方法不是为实现Java互操作性而设计的.所有变体都需要TypeTags,并且可以手动生成它们(我很确定我已经见过 Daniel Darabos 展示如何在SO上执行此操作),这是您可能要避免的事情.

Long story short functions.udf methods are not designed for Java interoperability. All variants require TypeTags and while it is possible to generate these manually (I am pretty sure I've seen Daniel Darabos showing how to do it on SO) it is something you probably want to avoid.

如果出于某种原因要避免在Scala中编写UDF,最简单的方法是注册UDF

If for some reason you want to avoid writing UDF in Scala the simplest thing is to register UDF and call it by name:

sqlContext.udf().register("udfUppercase",
  (String string) -> string.toUpperCase(), DataTypes.StringType);

df.select(callUDF("udfUppercase", col("name")));

这篇关于在SQLContext之外的Java中创建SparkSQL UDF的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆