Spark SQL 滑动窗口差异计算 [英] Spark SQL sliding window difference computation

查看:173
本文介绍了Spark SQL 滑动窗口差异计算的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在不诉诸火花流的情况下在 Spark 中计算滑动窗口?

注意:我不想在当前一个之前/之后使用 WINDOW PARTITION BY K ROWS,而是使用时间戳.window 操作符有这样一个模式:

from pyspark.sql import SparkSessionspark = SparkSession.builder.getOrCreate()df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},{a":x",b":2021-02-21 02:00:00",c":4"},{a":x",b":2021-02-21 03:00:00",c":2"}])小时间隔 = str(4) + '小时'滑动窗口 = str(60) + '分钟'从 pyspark.sql.functions 导入 col、min、max、sum、when、lit、window、date_format导入时间df_aggregated_time_window = df.groupBy(a", window(b", windowDuration=hour_interval,slideDuration=sliding_window, startTime=30 分钟")).agg(min(c").alias(min_c"))df_aggregated_time_window.show(truncate=False)+---+-------------------------------------------+-----+|a |窗口 |min_c|+---+-------------------------------------------+-----+|x |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3 ||x |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2 ||x |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3 ||x |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2 ||x |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2 ||x |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2 |+---+-------------------------------------------+-----+

对于 3 个输入行 3 个输出行中的每一行,我想要的结果将作为 4 小时基于时间的窗口(= 状态)的滑动增量返回,该窗口每小时提前一小时并每小时触发一次(但是作为这是批处理,而不是流式触发应该无关紧要).

相反,我得到了上述基数的结果 >所需的行数.

编辑

所需的输出:

输入:

x,2021-02-21 01:00:00",3x,2021-02-21 02:00:00",4x,2021-02-21 03:00:00",4x,2021-02-21 04:00:00",1

输出:

x,2021-02-21 01:00:00", NULL//前 3 小时(包括自己)没有找到前一条记录x,2021-02-21 02:00:00",3//因为我们目前只计算`min` 为简单起见(稍后它应该是max - min 以查看增量)在过去3 小时内该值为3(正好是上一行)x,2021-02-21 03:00:00",3//在 4 小时窗口内 3 仍然是最小的x,2021-02-21 04:00:00",1//前<=3小时内(包括自身)1最小

解决方案

恐怕你对 window 表达式的假设是不正确的.根据其文档 这里:

<块引用>

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

在给定时间戳指定列的情况下,将行分桶到一个或多个时间窗口中.窗口开始是包含的,但窗口结束是独占的,例如12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中....

因此,在您的 4 小时窗口和 1 小时滑动步骤的情况下,将有 6 个可以应用聚合的存储桶:

[2021-02-20 22:00:00, 2021-02-21 02:00:00) <-- 包含最早 b 的第一个桶 = 2021-02-21 01:00:00[2021-02-20 23:00:00, 2021-02-21 03:00:00)[2021-02-21 00:00:00, 2021-02-21 04:00:00)[2021-02-21 01:00:00, 2021-02-21 05:00:00)[2021-02-21 02:00:00, 2021-02-21 06:00:00)[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- 包含最新 b 的最后一个桶 = 2021-02-21 03:00:00

我不完全理解我不想使用 WINDOW PARTITION BY ORDER BY..." 因为这将使您能够有效地满足您的要求 为每个输入获取一个输出行,计算为当前小时和前 3 小时的状态.

How can I compute a sliding window in Spark without resorting to spark streaming?

NOTICE: I do not want to use a WINDOW PARTITION BY ORDER BY k ROWS before/after current one, but use the timestamp. The window operator has such a mode:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([{"a": "x", "b": "2021-02-21 01:00:00", "c": "3"},
                            {"a": "x", "b": "2021-02-21 02:00:00", "c": "4"},
                            {"a": "x", "b": "2021-02-21 03:00:00", "c": "2"}])

hour_interval = str(4) + ' hour'
sliding_window = str(60) + ' minute'
from pyspark.sql.functions import col, min, max, sum, when, lit, window, date_format
import time

df_aggregated_time_window = df.groupBy("a", window("b", windowDuration=hour_interval,
                                slideDuration=sliding_window, startTime="30 minute")).agg(min("c").alias("min_c"))
df_aggregated_time_window.show(truncate=False)

+---+------------------------------------------+-----+
|a  |window                                    |min_c|
+---+------------------------------------------+-----+
|x  |[2021-02-20 23:00:00, 2021-02-21 03:00:00]|3    |
|x  |[2021-02-21 00:00:00, 2021-02-21 04:00:00]|2    |
|x  |[2021-02-20 22:00:00, 2021-02-21 02:00:00]|3    |
|x  |[2021-02-21 02:00:00, 2021-02-21 06:00:00]|2    |
|x  |[2021-02-21 01:00:00, 2021-02-21 05:00:00]|2    |
|x  |[2021-02-21 03:00:00, 2021-02-21 07:00:00]|2    |
+---+------------------------------------------+-----+

My desired result would return for each of the 3 input rows 3 output rows as the sliding delta of the 4 hourly time-based window (= state) which is advanced by one hour every hour and triggered once every hour (however as this is batch, not streaming triggering should not matter so much).

Instead, I get the result above with cardinality > the number of desired rows.

edit

Desired output:

input:

x,2021-02-21 01:00:00",3
x,2021-02-21 02:00:00",4
x,2021-02-21 03:00:00",4
x,2021-02-21 04:00:00",1

output:

x,2021-02-21 01:00:00", NULL // no single previous record to be found in the previous 3 hours (including self)
x,2021-02-21 02:00:00",3 // as we are currently only computing `min` for simplicity (later it should be max - min to see the deltas) within the last 3 hours the value is 3 (coincidentally the previous row)
x,2021-02-21 03:00:00",3 // within 4 hour window 3 is still the smallest
x,2021-02-21 04:00:00",1 // within the previous <= 3 hours (including self) 1 is smallest

解决方案

I'm afraid your assumption about window expression is incorrect. According to its documentation here:

def window(timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05). ...

So in your case of a 4-hour window and 1-hour sliding steps, there is going to be 6 buckets over which aggregations can be applied:

[2021-02-20 22:00:00, 2021-02-21 02:00:00)  <-- first bucket that contains the earliest b = 2021-02-21 01:00:00
[2021-02-20 23:00:00, 2021-02-21 03:00:00)
[2021-02-21 00:00:00, 2021-02-21 04:00:00)
[2021-02-21 01:00:00, 2021-02-21 05:00:00)
[2021-02-21 02:00:00, 2021-02-21 06:00:00)
[2021-02-21 03:00:00, 2021-02-21 07:00:00) <-- last bucket that contains the latest b = 2021-02-21 03:00:00

I do not entirely understand "I do not want to use a WINDOW PARTITION BY ORDER BY..." because that is what will allow you to efficiently fulfil your requirement to get one output row for each input computed as the state of the current hour and the previous 3 hours.

这篇关于Spark SQL 滑动窗口差异计算的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆