如何在spark中使用窗口函数过滤数据 [英] How to filter data using window functions in spark
本文介绍了如何在spark中使用窗口函数过滤数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有以下数据:
rowid uid time code
1 1 5 a
2 1 6 b
3 1 7 c
4 2 8 a
5 2 9 c
6 2 9 c
7 2 10 c
8 2 11 a
9 2 12 c
现在我想以这样的方式过滤数据,以便我可以删除第 6 行和第 7 行,因为对于特定的 uid,我只想在代码中保留一行值为 'c' 的行
Now I wanted to filter the data in such a way that I can remove the rows 6 and 7 as for a particular uid i want to keep just one row with value 'c' in code
所以预期的数据应该是:
So the expected data should be :
rowid uid time code
1 1 5 a
2 1 6 b
3 1 7 c
4 2 8 a
5 2 9 c
8 2 11 a
9 2 12 c
我正在使用这样的窗口函数:
I'm using window function something like this :
val window = Window.partitionBy("uid").orderBy("time")
val change = ((lag("code", 1).over(window) <=> "c")).cast("int")
这将帮助我们用代码c"标识每一行.我可以扩展它以过滤掉行以获得预期的数据
This would help us identify each row with a code 'c'. Can i extend this to filter out rows to get the expected data
推荐答案
如果您只想删除 code = "c" 的行(每个 uid 的第一个除外),您可以尝试以下操作:
If you want to remove only the lines where code = "c" (except the first one for each uid) you could try the following:
val window = Window.partitionBy("uid", "code").orderBy("time")
val result = df
.withColumn("rank", row_number().over(window))
.where(
(col("code") !== "c") ||
col("rank") === 1
)
.drop("rank")
根据新信息进行
val window = Window.partitionBy("uid").orderBy("time")
val result = df
.withColumn("lagValue", coalesce(lag(col("code"), 1).over(window), lit("")))
.where(
(col("code") !== "c") ||
(col("lagValue") !== "c")
)
.drop("lagValue")
这篇关于如何在spark中使用窗口函数过滤数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文