如何在PySpark中在一天内累积超过1小时的窗口 [英] How to aggregate over '1 hour' windows cumulatively within a day in PySpark
问题描述
我有一个Spark DataFrame,如下所示:
I have a Spark DataFrame like the one below:
+---------+--------------------------+
|group_id |event_time |
+---------+--------------------------+
|XXXX |2017-10-25 14:47:02.717013|
|XXXX |2017-10-25 14:47:25.444979|
|XXXX |2017-10-25 14:49:32.21353 |
|YYYY |2017-10-25 14:50:38.321134|
|YYYY |2017-10-25 14:51:12.028447|
|ZZZZ |2017-10-25 14:51:24.810688|
|YYYY |2017-10-25 14:37:34.241097|
|ZZZZ |2017-10-25 14:37:24.427836|
|XXXX |2017-10-25 14:37:24.620864|
|YYYY |2017-10-25 14:37:24.964614|
+---------+--------------------------+
我想计算每个 group_id
在一天之内每小时发生的事件的滚动计数.
I want to calculate the rolling count of events per hour within a day per group_id
.
因此,对于日期时间 25-10 14:00
和 group_id
,我想计算该 group_id
从 25-10 00:00
到 25-10 14:00
.
So, for the datetime 25-10 14:00
and for a group_id
, I want to calculate the count of events for that group_id
from 25-10 00:00
till 25-10 14:00
.
执行以下操作:
df.groupBy('group_id', window('event_time', '1 hour').alias('model_window')) \
.agg(dfcount(lit(1)).alias('values'))
计算每小时的事件计数,但不是每天的累积计数.
calculates the count of events per hour, but not cumulatively during each day.
有什么想法吗?
编辑:预期的输出将类似于:
EDIT: The expected output would be something like:
+---------+---------------------------------------------+-------+
|group_id |model_window |values |
+---------+---------------------------------------------+-------+
|XXXX |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 10 |
|XXXX |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 17 |
|XXXX |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 22 |
|YYYY |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 0 |
|YYYY |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 1 |
|YYYY |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 9 |
+---------+---------------------------------------------+-------+
推荐答案
想在一天之内每个group_id每小时计算....
want to calculate ... per hour within a day per group_id.
提取数据和小时:
from pyspark.sql.functions import col, count, hour, sum
extended = (df
.withColumn("event_time", col("event_time").cast("timestamp"))
.withColumn("date", col("event_time").cast("date"))
.withColumn("hour", hour(col("event_time"))))
计算聚合
aggs = extended.groupBy("group_id", "date", "hour").count()
我要计算事件的滚动计数
I want to calculate the rolling count of events
并使用窗口功能:
from pyspark.sql.window import Window
aggs.withColumn(
"agg_count",
sum("count").over(Window.partitionBy("group_id", "date").orderBy("hour")))
要在缺少时间间隔的情况下获得0,您必须为每个日期和小时生成参考数据并将其加入.
To get 0 for missing intervals you'll have to generate reference data for each date and hour and join it.
将 df
定义为:
df = sc.parallelize([
("XXXX", "2017-10-25 01:47:02.717013"),
("XXXX", "2017-10-25 14:47:25.444979"),
("XXXX", "2017-10-25 14:49:32.21353"),
("YYYY", "2017-10-25 14:50:38.321134"),
("YYYY", "2017-10-25 14:51:12.028447"),
("ZZZZ", "2017-10-25 14:51:24.810688"),
("YYYY", "2017-10-25 14:37:34.241097"),
("ZZZZ", "2017-10-25 14:37:24.427836"),
("XXXX", "2017-10-25 22:37:24.620864"),
("YYYY", "2017-10-25 16:37:24.964614")
]).toDF(["group_id", "event_time"])
结果是
+--------+----------+----+-----+---------+
|group_id| date|hour|count|agg_count|
+--------+----------+----+-----+---------+
| XXXX|2017-10-25| 1| 1| 1|
| XXXX|2017-10-25| 14| 2| 3|
| XXXX|2017-10-25| 22| 1| 4|
| ZZZZ|2017-10-25| 14| 2| 2|
| YYYY|2017-10-25| 14| 3| 3|
| YYYY|2017-10-25| 16| 1| 4|
+--------+----------+----+-----+---------+
这篇关于如何在PySpark中在一天内累积超过1小时的窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!