如何使用JAVA在Spark DataFrame上调用UDF? [英] How do I call a UDF on a Spark DataFrame using JAVA?

查看:433
本文介绍了如何使用JAVA在Spark DataFrame上调用UDF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此处,但没有足够的积分在那里发表评论。

Similar question as here, but don't have enough points to comment there.

根据最新的Spark 文档 udf 可以两种不同的方式使用,一种用SQL另一个使用DataFrame。我找到了多个如何使用sql使用 udf 的示例,但是无法找到有关如何使用 udf 直接在DataFrame上。

According to the latest Spark documentation an udf can be used in two different ways, one with SQL and another with a DataFrame. I found multiple exampled of how to use an udf with sql, but have not been able to find any on how to use a udf directly on a DataFrame.

o.p.提供的解决方案在上面链接的问题上使用 __ callUDF()__ 这是 _deprecated _ 并将根据Spark在Spark 2.0中删除Java API文档。在那里,它说:

The solution provided by the o.p. on the question linked above uses __callUDF()__ which is _deprecated_ and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:


因为它是多余的udf()

"since it's redundant with udf()"

所以这意味着我应该可以使用 __ udf()__ 来计算我的 udf ,但我无法弄清楚如何做到这一点。我没有偶然发现一些拼写Java-Spark程序语法的东西。我缺少什么?

so this means I should be able to use __udf()__ to cal a my udf, but I can't figure out how to do that. I have not stumbled on something that spells the syntax for Java-Spark programs. What am I missing?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?


推荐答案

Spark> = 2.3

可以直接调用Scala风格的 udf

Scala-style udf can be invoked directly:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

Spark< 2.3

即使我们假设您的UDF很有用且无法用简单的 getItem 替换称它有不正确的签名。使用Scala WrappedArray 而不是普通的Java Arrays公开数组列,因此您必须调整签名:

Even if we assume that your UDF is useful and cannot be replaced by a simple getItem call it has incorrect signature. Array columns are exposed using Scala WrappedArray not plain Java Arrays so you have to adjust the signature:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

如果UDF已经注册:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

您只需使用 callUDF (1.5中引入的新功能)按名称调用:

you can simply use callUDF (which is a new function introduced in 1.5) to call it by name:

df.select(callUDF("mode", col("vs"))).show();

您也可以在 selectExprs 中使用它:

You can also use it in selectExprs:

df.selectExpr("mode(vs)").show();

这篇关于如何使用JAVA在Spark DataFrame上调用UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆