如何将整行传递给 UDF - Spark DataFrame 过滤器 [英] How to pass whole Row to UDF - Spark DataFrame filter
本文介绍了如何将整行传递给 UDF - Spark DataFrame 过滤器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在为具有很多内部结构的复杂 JSON 数据集编写过滤器函数.传递单个列太麻烦了.
I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.
所以我声明了以下 UDF:
So I declared the following UDF:
val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))
直觉上我认为它会像这样工作:
Intuitively I'm thinking it will work like this:
records.filter("myFilter(*)=true")
实际的语法是什么?
推荐答案
调用函数时必须使用 struct()
函数来构造行,请按照下列步骤操作.
You have to use struct()
function for constructing the row while making a call to the function, follow these steps.
>
导入行,
import org.apache.spark.sql._
定义 UDF
def myFilterFunction(r:Row) = {r.get(0)==r.get(1)}
注册 UDF
sqlContext.udf.register("myFilterFunction", myFilterFunction _)
创建数据框
val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")
使用 UDF
records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show
当您希望所有列都传递给 UDF 时.
When u want all columns to be passed to UDF.
records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show
结果:
+------+------+
| text| text2|
+------+------+
|sachin|sachin|
+------+------+
这篇关于如何将整行传递给 UDF - Spark DataFrame 过滤器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文