如何使用 Spark 中的组聚合滚动时间窗口 [英] How to aggregate over rolling time window with groups in Spark

查看:32
本文介绍了如何使用 Spark 中的组聚合滚动时间窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些数据要按特定列分组,然后根据组中的滚动时间窗口聚合一系列字段.

I have some data that I want to group by a certain column, then aggregate a series of fields based on a rolling time window from the group.

以下是一些示例数据:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
                            Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
                            Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
                            Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
                            Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
                            Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])

我想按 group_by 分组,然后创建从最早日期开始并延长到 30 天没有该组条目的时间窗口.在这 30 天结束后,下一个时间窗口将从不属于前一个窗口的下一行的日期开始.

I want to group by group_by, then create time windows that start at the earliest date and extend until there are 30 days with no entry for that group. After those 30 days are over, the next time window would start with the date of the next row that did not fall in the previous window.

然后我想聚合,例如获取get_avg的平均值,以及get_first的第一个结果.

I then want to aggregate, for example getting the average of get_avg, and the first result of get_first.

所以这个例子的输出应该是:

So the output for this example should be:

group_by    first date of window    get_avg  get_first
group1      2016-01-01              5        1
group2      2016-02-01              20       3
group2      2016-04-02              8        4

抱歉,我意识到我的问题没有正确指定.我实际上想要一个在 30 天不活动后结束的窗口.我相应地修改了示例的 group2 部分.

edit: sorry I realized my question was not specified properly. I actually want a window that ends after 30 days of inactivity. I have modified the group2 portion of the example accordingly.

推荐答案

修改后的答案:

您可以在这里使用一个简单的窗口函数技巧.一堆进口:

You can use a simple window functions trick here. A bunch of imports:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window

窗口定义:

w = Window.partitionBy("group_by").orderBy("date")

date 转换为 DateType:

df_ = df.withColumn("date", col("date").cast("date"))

定义以下表达式:

# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))

# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")

# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")

在表中添加subgroup表达式:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")

+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
|  group1|       0|         5.0|
|  group2|       0|        20.0|
|  group2|       1|         8.0|
+--------+--------+------------+

first 对聚合没有意义,但如果列单调递增,您可以使用 min.否则,您还必须使用窗口函数.

first is not meaningful with aggregations, but if column is monotonically increasing you can use min. Otherwise you'll have to use window functions as well.

使用 Spark 2.1 测试.与较早的 Spark 版本一起使用时,可能需要子查询和 Window 实例.

Tested using Spark 2.1. May require subqueries and Window instance when used with earlier Spark release.

原始答案(在指定范围内不相关)

The original answer (not relevant in the specified scope)

从 Spark 2.0 开始,您应该可以使用 一个 window 函数:

Since Spark 2.0 you should be able to use a window function:

在给定时间戳指定列的情况下,将行分桶到一个或多个时间窗口中.窗口开始是包含的,但窗口结束是独占的,例如12:05 将在窗口 [12:05,12:10) 中,但不在 [12:00,12:05) 中.

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

from pyspark.sql.functions import window

df.groupBy(window("date", windowDuration="30 days")).count()

但是你可以从结果中看出,

but you can see from the result,

+---------------------------------------------+-----+
|window                                       |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1    |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2    |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1    |
+---------------------------------------------+-----+

当涉及到时区时,你必须小心一点.

you'll have to be a bit careful when it comes to timezones.

这篇关于如何使用 Spark 中的组聚合滚动时间窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆