如何在pyspark的表达式中避免多个窗口函数 [英] How to avoid multiple window functions in a expression in pyspark
问题描述
我希望火花避免为我的代码中使用两次的同一窗口对象创建两个单独的窗口阶段.
I want spark to avoid creating two separate window stage, for same window object used twice in my code.
在以下示例中,如何在代码中使用它一次,并告诉spark在单个窗口下进行求和和除法.
How can I use it once in my code in the following example, and tell spark to do sum and division under single window.
df = df.withColumn("colum_c",
f.sum(f.col("colum_a")).over(window) /
f.sum(f.col("colum_b")).over(window))
示例:
days = lambda i: (i - 1) * 86400
window = (
Window()
.partitionBy(f.col("account_id"))
.orderBy(f.col("event_date").cast("timestamp").cast("long"))
.rangeBetween(-days(7), 0)
)
df.withColumn(
"fea_fuel_consumption_ratio_petrol_diesel_01w",
(
f.sum(f.col("fea_fuel_consumption_petrol")).over(window)
/ f.sum(f.col("fea_fuel_consumption_diesel")).over(window)
),
).show(1000, False)
推荐答案
您只能在 个
上使用 collect_list
窗口
,然后使用高阶函数 聚合
来获得所需的结果 (总和).
You could use collect_list
over only one
window
and then use higher order function aggregate
to get your desired result (sum/sum).
df.show() #sample data
#+----------+--------+--------+----------+
#|account_id|column_a|column_b|event_date|
#+----------+--------+--------+----------+
#| 1| 90| 23| 2019-2-23|
#| 1| 45| 12| 2019-2-28|
#| 1| 80| 38| 2019-3-21|
#| 1| 62| 91| 2019-3-24|
#| 2| 21| 11| 2019-3-29|
#| 2| 57| 29| 2019-1-08|
#| 2| 68| 13| 2019-1-12|
#| 2| 19| 14| 2019-1-14|
#+----------+--------+--------+----------+
from pyspark.sql import functions as f
from pyspark.sql.window import Window
days = lambda i: i * 86400
window =\
Window()\
.partitionBy(f.col("account_id"))\
.orderBy(f.col("event_date").cast("timestamp").cast("long"))\
.rangeBetween(-days(7), 0)
df.withColumn("column_c",f.collect_list(f.array("column_a","column_b")).over(window))\
.withColumn("column_c", f.expr("""aggregate(column_c,0,(acc,x)-> int(x[0])+acc)/\
aggregate(column_c,0,(acc,x)-> int(x[1])+acc)""")).show()
#+----------+--------+--------+----------+------------------+
#|account_id|column_a|column_b|event_date| column_c|
#+----------+--------+--------+----------+------------------+
#| 1| 90| 23| 2019-2-23|3.9130434782608696|
#| 1| 45| 12| 2019-2-28| 3.857142857142857|
#| 1| 80| 38| 2019-3-21|2.1052631578947367|
#| 1| 62| 91| 2019-3-24|1.1007751937984496|
#| 2| 57| 29| 2019-1-08|1.9655172413793103|
#| 2| 68| 13| 2019-1-12|2.9761904761904763|
#| 2| 19| 14| 2019-1-14|2.5714285714285716|
#| 2| 21| 11| 2019-3-29|1.9090909090909092|
#+----------+--------+--------+----------+------------------+
正如在 物理计划
中看到的那样,使用这种方法,您只能看到 1
windowspecdefinition
或 specifiedwindowframe
,因此使用了1个窗口.
As you can see in the physical plan
, using this method, you can only see 1
windowspecdefinition
or specifiedwindowframe
, hence 1 window used.
.explain()
== Physical Plan ==
*(2) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(aggregate(column_c#6838, 0, lambdafunction((cast(lambda x#6846[0] as int) + lambda acc#6845), lambda acc#6845, lambda x#6846, false), lambdafunction(lambda id#6847, lambda id#6847, false)) as double) / cast(aggregate(column_c#6838, 0, lambdafunction((cast(lambda x#6849[1] as int) + lambda acc#6848), lambda acc#6848, lambda x#6849, false), lambdafunction(lambda id#6850, lambda id#6850, false)) as double)) AS column_c#6844]
+- Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, collect_list(_w1#6857, 0, 0) windowspecdefinition(account_id#4848L, _w0#6856L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -518400, currentrow$())) AS column_c#6838], [account_id#4848L], [_w0#6856L ASC NULLS FIRST]
+- Sort [account_id#4848L ASC NULLS FIRST, _w0#6856L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(account_id#4848L, 200), [id=#1554]
+- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6856L, array(column_a#4850L, column_b#4849L) AS _w1#6857]
+- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]
而不是:
(2个窗口)
Instead of:
(2 windows)
df.withColumn("colum_c",f.sum(f.col("column_a")).over(window)\
/f.sum(f.col("column_b")).over(window)).show()
在此 物理计划
中,我们可以看到2个 windowspecdefinition
或 指定的窗口框架.
,因此使用了2个窗口.
In this physical plan
, we can see 2 instances of windowspecdefinition
or specifiedwindowframe.
hence 2 windows used.
.explain()
== Physical Plan ==
Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(sum(column_a#4850L) windowspecdefinition(account_id#4848L, _w0#6804L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double) / cast(sum(column_b#4849L) windowspecdefinition(account_id#4848L, _w0#6804L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double)) AS colum_c#6798], [account_id#4848L], [_w0#6804L ASC NULLS FIRST]
+- Sort [account_id#4848L ASC NULLS FIRST, _w0#6804L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(account_id#4848L, 200), [id=#1453]
+- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6804L]
+- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]
这篇关于如何在pyspark的表达式中避免多个窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!