Spark-带有递归的窗口? -有条件地跨行传播值 [英] Spark - Window with recursion? - Conditionally propagating values across rows
问题描述
我有以下数据框显示购买收入.
I have the following dataframe showing the revenue of purchases.
+-------+--------+-------+
|user_id|visit_id|revenue|
+-------+--------+-------+
| 1| 1| 0|
| 1| 2| 0|
| 1| 3| 0|
| 1| 4| 100|
| 1| 5| 0|
| 1| 6| 0|
| 1| 7| 200|
| 1| 8| 0|
| 1| 9| 10|
+-------+--------+-------+
最终,我希望新列purch_revenue
在每一行中显示购买产生的收入.
作为一种解决方法,我还尝试引入了购买标识符purch_id
,该标识符在每次购买时都会递增.因此,此处仅作为参考列出.
Ultimately I want the new column purch_revenue
to show the revenue generated by the purchase in every row.
As a workaround, I have also tried to introduce a purchase identifier purch_id
which is incremented each time a purchase was made. So this is listed just as a reference.
+-------+--------+-------+-------------+--------+
|user_id|visit_id|revenue|purch_revenue|purch_id|
+-------+--------+-------+-------------+--------+
| 1| 1| 0| 100| 1|
| 1| 2| 0| 100| 1|
| 1| 3| 0| 100| 1|
| 1| 4| 100| 100| 1|
| 1| 5| 0| 100| 2|
| 1| 6| 0| 100| 2|
| 1| 7| 200| 100| 2|
| 1| 8| 0| 100| 3|
| 1| 9| 10| 100| 3|
+-------+--------+-------+-------------+--------+
我试图像这样使用lag/lead
函数:
I've tried to use the lag/lead
function like this:
user_timeline = Window.partitionBy("user_id").orderBy("visit_id")
find_rev = fn.when(fn.col("revenue") > 0,fn.col("revenue"))\
.otherwise(fn.lead(fn.col("revenue"), 1).over(user_timeline))
df.withColumn("purch_revenue", find_rev)
如果revenue > 0
,这将复制收入列,并将其向上拉一行.显然,我可以将其链接为有限的N,但这不是解决方案.
This duplicates the revenue column if revenue > 0
and also pulls it up by one row. Clearly, I can chain this for a finite N, but that's not a solution.
- 是否有一种方法可以递归地应用到
revenue > 0
? - 或者,有没有一种方法可以根据条件来增加值?我试图找出一种方法来解决这个问题,但是却很难找到一个方法.
推荐答案
窗口函数不支持递归,但此处不需要.这种累加可以很容易地通过累积和来处理:
Window functions don't support recursion but it is not required here. This type of sesionization can be easily handled with cumulative sum:
from pyspark.sql.functions import col, sum, when, lag
from pyspark.sql.window import Window
w = Window.partitionBy("user_id").orderBy("visit_id")
purch_id = sum(lag(when(
col("revenue") > 0, 1).otherwise(0),
1, 0
).over(w)).over(w) + 1
df.withColumn("purch_id", purch_id).show()
+-------+--------+-------+--------+
|user_id|visit_id|revenue|purch_id|
+-------+--------+-------+--------+
| 1| 1| 0| 1|
| 1| 2| 0| 1|
| 1| 3| 0| 1|
| 1| 4| 100| 1|
| 1| 5| 0| 2|
| 1| 6| 0| 2|
| 1| 7| 200| 2|
| 1| 8| 0| 3|
| 1| 9| 10| 3|
+-------+--------+-------+--------+
这篇关于Spark-带有递归的窗口? -有条件地跨行传播值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!