Pyspark Dataframe:获取满足条件的上一行 [英] Pyspark Dataframe: Get previous row that meets a condition
问题描述
对于 PySpark DataFrame 中的每一行,我试图从满足特定条件的前第一行获取一个值:
For every row in a PySpark DataFrame I am trying to get a value from the first preceding row that satisfied a certain condition:
如果我的数据框看起来像这样:
That is if my dataframe looks like this:
X | Flag
1 | 1
2 | 0
3 | 0
4 | 0
5 | 1
6 | 0
7 | 0
8 | 0
9 | 1
10 | 0
我想要这样的输出:
X | Lag_X | Flag
1 | NULL | 1
2 | 1 | 0
3 | 1 | 0
4 | 1 | 0
5 | 1 | 1
6 | 5 | 0
7 | 5 | 0
8 | 5 | 0
9 | 5 | 1
10 | 9 | 0
我想我可以用滞后函数和 WindowSpec 来做到这一点,不幸的是 WindowSpec 不支持 .filter
或 .when
,所以这不起作用:
I thought I could do this with lag function and a WindowSpec, unfortunately WindowSpec doesnt support .filter
or .when
, so this does not work:
conditional_window = Window().orderBy(X).filter(df[Flag] == 1)
df = df.withColumn('lag_x', f.lag(df[x],1).over(conditional_window)
这看起来应该很简单,但我一直在绞尽脑汁试图找到解决方案,因此我们将不胜感激
It seems like this should be simple, but I have been racking my brain trying to find a solution so any help with this would be greatly appreciated
推荐答案
问题很老,但我认为答案可能对其他人有帮助
Question is old, but I thought the answer might help others
这是一个使用窗口和滞后函数的工作解决方案
Here is a working solution using window and lag functions
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import when
from pyspark.context import SparkContext
# Call SparkContext
sc = SparkContext.getOrCreate()
sc = sparkContext
# Create DataFrame
a = sc.createDataFrame([(1, 1),
(2, 0),
(3, 0),
(4, 0),
(5, 1),
(6, 0),
(7, 0),
(8, 0),
(9, 1),
(10, 0)]
, ['X', 'Flag'])
# Use a window function
win = Window.orderBy("X")
# Condition : if preceeding row in column "Flag" is not 0
condition = F.lag(F.col("Flag"), 1).over(win) != 0
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = a.withColumn("Flag_X", F.when(condition, F.col("X") - 1))
现在,我们获得如下所示的DataFrame
Now, we obtain a DataFrame as shown below
+---+----+------+
| X|Flag|Flag_X|
+---+----+------+
| 1| 1| null|
| 2| 0| 1|
| 3| 0| null|
| 4| 0| null|
| 5| 1| null|
| 6| 0| 5|
| 7| 0| null|
| 8| 0| null|
| 9| 1| null|
| 10| 0| 9|
+---+----+------+
填充空值:
a = a.withColumn("Flag_X",
F.last(F.col("Flag_X"), ignorenulls=True)\
.over(win))
所以最终的 DataFrame 符合要求:
So the final DataFrame is as required :
+---+----+------+
| X|Flag|Flag_X|
+---+----+------+
| 1| 1| null|
| 2| 0| 1|
| 3| 0| 1|
| 4| 0| 1|
| 5| 1| 1|
| 6| 0| 5|
| 7| 0| 5|
| 8| 0| 5|
| 9| 1| 5|
| 10| 0| 9|
+---+----+------+
这篇关于Pyspark Dataframe:获取满足条件的上一行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!