Pyspark Dataframe:获取满足条件的上一行 [英] Pyspark Dataframe: Get previous row that meets a condition

查看:74
本文介绍了Pyspark Dataframe:获取满足条件的上一行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于 PySpark DataFrame 中的每一行,我试图从满足特定条件的前第一行获取一个值:

For every row in a PySpark DataFrame I am trying to get a value from the first preceding row that satisfied a certain condition:

如果我的数据框看起来像这样:

That is if my dataframe looks like this:

X  | Flag
1  | 1
2  | 0
3  | 0
4  | 0
5  | 1
6  | 0
7  | 0
8  | 0
9  | 1
10 | 0

我想要这样的输出:

X  | Lag_X | Flag
1  | NULL  | 1
2  | 1     | 0
3  | 1     | 0
4  | 1     | 0
5  | 1     | 1
6  | 5     | 0
7  | 5     | 0
8  | 5     | 0
9  | 5     | 1
10 | 9     | 0

我想我可以用滞后函数和 WindowSpec 来做到这一点,不幸的是 WindowSpec 不支持 .filter.when,所以这不起作用:

I thought I could do this with lag function and a WindowSpec, unfortunately WindowSpec doesnt support .filter or .when, so this does not work:

conditional_window = Window().orderBy(X).filter(df[Flag] == 1)
df = df.withColumn('lag_x', f.lag(df[x],1).over(conditional_window)

这看起来应该很简单,但我一直在绞尽脑汁试图找到解决方案,因此我们将不胜感激

It seems like this should be simple, but I have been racking my brain trying to find a solution so any help with this would be greatly appreciated

推荐答案

问题很老,但我认为答案可能对其他人有帮助

Question is old, but I thought the answer might help others

这是一个使用窗口和滞后函数的工作解决方案

Here is a working solution using window and lag functions

from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import when
from pyspark.context import SparkContext

# Call SparkContext
sc = SparkContext.getOrCreate()
sc = sparkContext

# Create DataFrame
a = sc.createDataFrame([(1, 1), 
                        (2, 0),
                        (3, 0),
                        (4, 0),
                        (5, 1),
                        (6, 0),
                        (7, 0),
                        (8, 0),
                        (9, 1),
                       (10, 0)]
                     , ['X', 'Flag'])

# Use a window function
win = Window.orderBy("X")
# Condition : if preceeding row in column "Flag" is not 0
condition = F.lag(F.col("Flag"), 1).over(win) != 0
# Add a new column : if condition is true, value is value of column "X" at the previous row
a = a.withColumn("Flag_X", F.when(condition, F.col("X") - 1))

现在,我们获得如下所示的DataFrame

Now, we obtain a DataFrame as shown below

+---+----+------+
|  X|Flag|Flag_X|
+---+----+------+
|  1|   1|  null|
|  2|   0|     1|
|  3|   0|  null|
|  4|   0|  null|
|  5|   1|  null|
|  6|   0|     5|
|  7|   0|  null|
|  8|   0|  null|
|  9|   1|  null|
| 10|   0|     9|
+---+----+------+

填充空值:

a = a.withColumn("Flag_X", 
                 F.last(F.col("Flag_X"), ignorenulls=True)\
     .over(win))

所以最终的 DataFrame 符合要求:

So the final DataFrame is as required :

+---+----+------+
|  X|Flag|Flag_X|
+---+----+------+
|  1|   1|  null|
|  2|   0|     1|
|  3|   0|     1|
|  4|   0|     1|
|  5|   1|     1|
|  6|   0|     5|
|  7|   0|     5|
|  8|   0|     5|
|  9|   1|     5|
| 10|   0|     9|
+---+----+------+

这篇关于Pyspark Dataframe:获取满足条件的上一行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆