如何使用JAVA在Spark DataFrame上调用UDF? [英] How do I call a UDF on a Spark DataFrame using JAVA?
问题描述
与此处,但没有足够的积分在那里发表评论。
Similar question as here, but don't have enough points to comment there.
根据最新的Spark 文档 udf
可以两种不同的方式使用,一种用SQL另一个使用DataFrame。我找到了多个如何使用sql使用 udf
的示例,但是无法找到有关如何使用 udf $ c的任何内容$ c>直接在DataFrame上。
According to the latest Spark documentation an udf
can be used in two different ways, one with SQL and another with a DataFrame. I found multiple exampled of how to use an udf
with sql, but have not been able to find any on how to use a udf
directly on a DataFrame.
o.p.提供的解决方案在上面链接的问题上使用 __ callUDF()__
这是 _deprecated _
并将根据Spark在Spark 2.0中删除Java API文档。在那里,它说:
The solution provided by the o.p. on the question linked above uses __callUDF()__
which is _deprecated_
and will be removed in Spark 2.0 according to the Spark Java API documentation. There, it says:
因为它是多余的udf()
"since it's redundant with udf()"
所以这意味着我应该可以使用 __ udf()__
来计算我的 udf
,但我无法弄清楚如何做到这一点。我没有偶然发现一些拼写Java-Spark程序语法的东西。我缺少什么?
so this means I should be able to use __udf()__
to cal a my udf
, but I can't figure out how to do that. I have not stumbled on something that spells the syntax for Java-Spark programs. What am I missing?
import org.apache.spark.sql.api.java.UDF1;
.
.
UDF1 mode = new UDF1<String[], String>() {
public String call(final String[] types) throws Exception {
return types[0];
}
};
sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?
推荐答案
Spark> = 2.3
可以直接调用Scala风格的 udf
:
Scala-style udf
can be invoked directly:
import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
UserDefinedFunction mode = udf(
(Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);
df.select(mode.apply(col("vs"))).show();
Spark< 2.3
即使我们假设您的UDF很有用且无法用简单的 getItem
替换称它有不正确的签名。使用Scala WrappedArray
而不是普通的Java Arrays公开数组列,因此您必须调整签名:
Even if we assume that your UDF is useful and cannot be replaced by a simple getItem
call it has incorrect signature. Array columns are exposed using Scala WrappedArray
not plain Java Arrays so you have to adjust the signature:
UDF1 mode = new UDF1<Seq<String>, String>() {
public String call(final Seq<String> types) throws Exception {
return types.headOption();
}
};
如果UDF已经注册:
sqlContext.udf().register("mode", mode, DataTypes.StringType);
您只需使用 callUDF
(1.5中引入的新功能)按名称调用:
you can simply use callUDF
(which is a new function introduced in 1.5) to call it by name:
df.select(callUDF("mode", col("vs"))).show();
您也可以在 selectExprs
中使用它:
You can also use it in selectExprs
:
df.selectExpr("mode(vs)").show();
这篇关于如何使用JAVA在Spark DataFrame上调用UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!