如何在pyspark的表达式中避免多个窗口函数 [英] How to avoid multiple window functions in a expression in pyspark

查看:99
本文介绍了如何在pyspark的表达式中避免多个窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望火花避免为我的代码中使用两次的同一窗口对象创建两个单独的窗口阶段.

I want spark to avoid creating two separate window stage, for same window object used twice in my code.

在以下示例中,如何在代码中使用它一次,并告诉spark在单个窗口下进行求和和除法.

How can I use it once in my code in the following example, and tell spark to do sum and division under single window.

df = df.withColumn("colum_c", 
            f.sum(f.col("colum_a")).over(window) /
            f.sum(f.col("colum_b")).over(window))

示例:

days = lambda i: (i - 1) * 86400

window = (
    Window()
    .partitionBy(f.col("account_id"))
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))
    .rangeBetween(-days(7), 0)
)

df.withColumn(
    "fea_fuel_consumption_ratio_petrol_diesel_01w",
    (
        f.sum(f.col("fea_fuel_consumption_petrol")).over(window)
        / f.sum(f.col("fea_fuel_consumption_diesel")).over(window)
    ),
).show(1000, False)

推荐答案

您只能在 上使用 collect_list 窗口 ,然后使用高阶函数 聚合 来获得所需的结果 (总和).

You could use collect_list over only one window and then use higher order function aggregate to get your desired result (sum/sum).

df.show() #sample data

#+----------+--------+--------+----------+
#|account_id|column_a|column_b|event_date|
#+----------+--------+--------+----------+
#|         1|      90|      23| 2019-2-23|
#|         1|      45|      12| 2019-2-28|
#|         1|      80|      38| 2019-3-21|
#|         1|      62|      91| 2019-3-24|
#|         2|      21|      11| 2019-3-29|
#|         2|      57|      29| 2019-1-08|
#|         2|      68|      13| 2019-1-12|
#|         2|      19|      14| 2019-1-14|
#+----------+--------+--------+----------+

from pyspark.sql import functions as f
from pyspark.sql.window import Window

days = lambda i: i * 86400

window =\
    Window()\
    .partitionBy(f.col("account_id"))\
    .orderBy(f.col("event_date").cast("timestamp").cast("long"))\
    .rangeBetween(-days(7), 0)

df.withColumn("column_c",f.collect_list(f.array("column_a","column_b")).over(window))\
  .withColumn("column_c", f.expr("""aggregate(column_c,0,(acc,x)-> int(x[0])+acc)/\
                               aggregate(column_c,0,(acc,x)-> int(x[1])+acc)""")).show()

#+----------+--------+--------+----------+------------------+
#|account_id|column_a|column_b|event_date|          column_c|
#+----------+--------+--------+----------+------------------+
#|         1|      90|      23| 2019-2-23|3.9130434782608696|
#|         1|      45|      12| 2019-2-28| 3.857142857142857|
#|         1|      80|      38| 2019-3-21|2.1052631578947367|
#|         1|      62|      91| 2019-3-24|1.1007751937984496|
#|         2|      57|      29| 2019-1-08|1.9655172413793103|
#|         2|      68|      13| 2019-1-12|2.9761904761904763|
#|         2|      19|      14| 2019-1-14|2.5714285714285716|
#|         2|      21|      11| 2019-3-29|1.9090909090909092|
#+----------+--------+--------+----------+------------------+

正如在 物理计划 中看到的那样,使用这种方法,您只能看到 1 windowspecdefinition specifiedwindowframe ,因此使用了1个窗口.

As you can see in the physical plan, using this method, you can only see 1 windowspecdefinition or specifiedwindowframe, hence 1 window used.

.explain()

== Physical Plan ==
*(2) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(aggregate(column_c#6838, 0, lambdafunction((cast(lambda x#6846[0] as int) + lambda acc#6845), lambda acc#6845, lambda x#6846, false), lambdafunction(lambda id#6847, lambda id#6847, false)) as double) / cast(aggregate(column_c#6838, 0, lambdafunction((cast(lambda x#6849[1] as int) + lambda acc#6848), lambda acc#6848, lambda x#6849, false), lambdafunction(lambda id#6850, lambda id#6850, false)) as double)) AS column_c#6844]
+- Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, collect_list(_w1#6857, 0, 0) windowspecdefinition(account_id#4848L, _w0#6856L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -518400, currentrow$())) AS column_c#6838], [account_id#4848L], [_w0#6856L ASC NULLS FIRST]
   +- Sort [account_id#4848L ASC NULLS FIRST, _w0#6856L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(account_id#4848L, 200), [id=#1554]
         +- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6856L, array(column_a#4850L, column_b#4849L) AS _w1#6857]
            +- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]

而不是: (2个窗口)

Instead of:(2 windows)

df.withColumn("colum_c",f.sum(f.col("column_a")).over(window)\
                              /f.sum(f.col("column_b")).over(window)).show()

在此 物理计划 中,我们可以看到2个 windowspecdefinition 指定的窗口框架. ,因此使用了2个窗口.

In this physical plan, we can see 2 instances of windowspecdefinition or specifiedwindowframe. hence 2 windows used.

.explain()

== Physical Plan ==
Window [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, (cast(sum(column_a#4850L) windowspecdefinition(account_id#4848L, _w0#6804L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double) / cast(sum(column_b#4849L) windowspecdefinition(account_id#4848L, _w0#6804L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -604800, currentrow$())) as double)) AS colum_c#6798], [account_id#4848L], [_w0#6804L ASC NULLS FIRST]
+- Sort [account_id#4848L ASC NULLS FIRST, _w0#6804L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(account_id#4848L, 200), [id=#1453]
      +- *(1) Project [account_id#4848L, column_b#4849L, column_a#4850L, event_date#4851, cast(cast(event_date#4851 as timestamp) as bigint) AS _w0#6804L]
         +- *(1) Scan ExistingRDD[account_id#4848L,column_b#4849L,column_a#4850L,event_date#4851]

这篇关于如何在pyspark的表达式中避免多个窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆