如何将整行传递给 UDF - Spark DataFrame 过滤器 [英] How to pass whole Row to UDF - Spark DataFrame filter

查看:41
本文介绍了如何将整行传递给 UDF - Spark DataFrame 过滤器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为具有很多内部结构的复杂 JSON 数据集编写过滤器函数.传递单个列太麻烦了.

I'm writing filter function for complex JSON dataset with lot's of inner structures. Passing individual columns is too cumbersome.

所以我声明了以下 UDF:

So I declared the following UDF:

val records:DataFrame = = sqlContext.jsonFile("...")
def myFilterFunction(r:Row):Boolean=???
sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r))

直觉上我认为它会像这样工作:

Intuitively I'm thinking it will work like this:

records.filter("myFilter(*)=true")

实际的语法是什么?

推荐答案

调用函数时必须使用 struct() 函数来构造行,请按照下列步骤操作.

You have to use struct() function for constructing the row while making a call to the function, follow these steps.

>

导入行,

import org.apache.spark.sql._

定义 UDF

def myFilterFunction(r:Row) = {r.get(0)==r.get(1)} 

注册 UDF

sqlContext.udf.register("myFilterFunction", myFilterFunction _)

创建数据框

val records = sqlContext.createDataFrame(Seq(("sachin", "sachin"), ("aggarwal", "aggarwal1"))).toDF("text", "text2")

使用 UDF

records.filter(callUdf("myFilterFunction",struct($"text",$"text2"))).show

当您希望所有列都传递给 UDF 时.

When u want all columns to be passed to UDF.

records.filter(callUdf("myFilterFunction",struct(records.columns.map(records(_)) : _*))).show 

结果:

+------+------+
|  text| text2|
+------+------+
|sachin|sachin|
+------+------+

这篇关于如何将整行传递给 UDF - Spark DataFrame 过滤器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆