如何在Spark群组中的滚动时间窗口内进行汇总 [英] How to aggregate over rolling time window with groups in Spark

查看:342
本文介绍了如何在Spark群组中的滚动时间窗口内进行汇总的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些数据要按某个列进行分组,然后根据该组中的滚动时间窗口汇总一系列字段.

I have some data that I want to group by a certain column, then aggregate a series of fields based on a rolling time window from the group.

以下是一些示例数据:

df = spark.createDataFrame([Row(date='2016-01-01', group_by='group1', get_avg=5, get_first=1),
                            Row(date='2016-01-10', group_by='group1', get_avg=5, get_first=2),
                            Row(date='2016-02-01', group_by='group2', get_avg=10, get_first=3),
                            Row(date='2016-02-28', group_by='group2', get_avg=20, get_first=3),
                            Row(date='2016-02-29', group_by='group2', get_avg=30, get_first=3),
                            Row(date='2016-04-02', group_by='group2', get_avg=8, get_first=4)])

我要按group_by分组,然后创建时间窗口,该时间窗口最早从日期开始,一直持续到30天,该组没有条目.在这30天结束之后,下一个时间窗口将从下一行的日期开始,而该日期不在前一个窗口中.

I want to group by group_by, then create time windows that start at the earliest date and extend until there are 30 days with no entry for that group. After those 30 days are over, the next time window would start with the date of the next row that did not fall in the previous window.

然后,我想进行汇总,例如获得get_avg的平均值和get_first的第一个结果.

I then want to aggregate, for example getting the average of get_avg, and the first result of get_first.

因此,此示例的输出应为:

So the output for this example should be:

group_by    first date of window    get_avg  get_first
group1      2016-01-01              5        1
group2      2016-02-01              20       3
group2      2016-04-02              8        4

对不起,我意识到我的问题没有正确指定.我实际上想要一个在闲置30天后结束的窗口.我已经相应地修改了示例的group2部分.

edit: sorry I realized my question was not specified properly. I actually want a window that ends after 30 days of inactivity. I have modified the group2 portion of the example accordingly.

推荐答案

修订后的答案:

您可以在此处使用简单的窗口函数技巧.一堆进口商品:

You can use a simple window functions trick here. A bunch of imports:

from pyspark.sql.functions import coalesce, col, datediff, lag, lit, sum as sum_
from pyspark.sql.window import Window

窗口定义:

w = Window.partitionBy("group_by").orderBy("date")

date投射到DateType:

df_ = df.withColumn("date", col("date").cast("date"))

定义以下表达式:

# Difference from the previous record or 0 if this is the first one
diff = coalesce(datediff("date", lag("date", 1).over(w)), lit(0))

# 0 if diff <= 30, 1 otherwise
indicator = (diff > 30).cast("integer")

# Cumulative sum of indicators over the window
subgroup = sum_(indicator).over(w).alias("subgroup")

在表中添加subgroup表达式:

df_.select("*", subgroup).groupBy("group_by", "subgroup").avg("get_avg")

+--------+--------+------------+
|group_by|subgroup|avg(get_avg)|
+--------+--------+------------+
|  group1|       0|         5.0|
|  group2|       0|        20.0|
|  group2|       1|         8.0|
+--------+--------+------------+

first对于聚合没有意义,但是如果列单调增加,则可以使用min.否则,您还必须使用窗口函数.

first is not meaningful with aggregations, but if column is monotonically increasing you can use min. Otherwise you'll have to use window functions as well.

使用Spark 2.1测试.与早期的Spark版本一起使用时,可能需要子查询和Window实例.

Tested using Spark 2.1. May require subqueries and Window instance when used with earlier Spark release.

原始答案(与指定范围无关)

从Spark 2.0开始,您应该可以使用一个window函数:

Since Spark 2.0 you should be able to use a window function:

在给定时间戳记指定列的情况下,将行打包到一个或多个时间窗口中.窗口开始处是包含的,但窗口结束处是排他的,例如12:05将在窗口[12:05,12:10)中,但不在[12:00,12:05)中.

Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

from pyspark.sql.functions import window

df.groupBy(window("date", windowDuration="30 days")).count()

但是您可以从结果中看到

but you can see from the result,

+---------------------------------------------+-----+
|window                                       |count|
+---------------------------------------------+-----+
|[2016-01-30 01:00:00.0,2016-02-29 01:00:00.0]|1    |
|[2015-12-31 01:00:00.0,2016-01-30 01:00:00.0]|2    |
|[2016-03-30 02:00:00.0,2016-04-29 02:00:00.0]|1    |
+---------------------------------------------+-----+

在时区方面,您必须要小心一点.

you'll have to be a bit careful when it comes to timezones.

这篇关于如何在Spark群组中的滚动时间窗口内进行汇总的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆