如何使用 JAVA 在 Spark DataFrame 上调用 UDF? [英] How do I call a UDF on a Spark DataFrame using JAVA?

查看:41
本文介绍了如何使用 JAVA 在 Spark DataFrame 上调用 UDF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此处的类似问题,但没有足够的分数在那里发表评论.

Similar question as here, but don't have enough points to comment there.

根据Spark最新documentation 可以以两种不同的方式使用 udf,一种使用 SQL,另一种使用 DataFrame.我找到了多个关于如何在 sql 中使用 udf 的示例,但没有找到任何关于如何直接在 DataFrame 上使用 udf 的示例.

According to the latest Spark documentation an udf can be used in two different ways, one with SQL and another with a DataFrame. I found multiple examples of how to use an udf with sql, but have not been able to find any on how to use a udf directly on a DataFrame.

o.p. 提供的解决方案在上面链接的问题上使用 __callUDF()__ 这是 _deprecated_ 并将根据 Spark Java API 文档在 Spark 2.0 中删除.在那里,它说:

The solution provided by the o.p. on the question linked above uses __callUDF()__ which is _deprecated_ and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:

因为它与 udf() 是多余的"

"since it's redundant with udf()"

所以这意味着我应该能够使用 __udf()__ 来校准我的 udf,但我不知道如何做到这一点.我没有偶然发现任何说明 Java-Spark 程序语法的东西.我错过了什么?

so this means I should be able to use __udf()__ to cal a my udf, but I can't figure out how to do that. I have not stumbled on anything that spells out the syntax for Java-Spark programs. What am I missing?

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

推荐答案

Spark >= 2.3

Scala 风格的 udf 可以直接调用:

Scala-style udf can be invoked directly:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

火花<2.3

即使我们假设您的 UDF 是有用的并且不能被简单的 getItem 调用替换,它也有不正确的签名.数组列使用 Scala WrappedArray 而不是普通的 Java 数组公开,因此您必须调整签名:

Even if we assume that your UDF is useful and cannot be replaced by a simple getItem call it has incorrect signature. Array columns are exposed using Scala WrappedArray not plain Java Arrays so you have to adjust the signature:

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

如果 UDF 已经注册:

If UDF is already registered:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

你可以简单地使用 callUDF(这是 1.5 中引入的新函数)以按名称调用它:

you can simply use callUDF (which is a new function introduced in 1.5) to call it by name:

df.select(callUDF("mode", col("vs"))).show();

你也可以在selectExprs中使用:

df.selectExpr("mode(vs)").show();

这篇关于如何使用 JAVA 在 Spark DataFrame 上调用 UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆