如何在 PySpark 的一天内累计聚合超过“1 小时"的窗口 [英] How to aggregate over '1 hour' windows cumulatively within a day in PySpark

查看:28
本文介绍了如何在 PySpark 的一天内累计聚合超过“1 小时"的窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下所示的 Spark DataFrame:

+---------+--------------------------+|group_id |event_time |+---------+--------------------------+|XXXX |2017-10-25 14:47:02.717013||XXXX |2017-10-25 14:47:25.444979||XXXX |2017-10-25 14:49:32.21353 ||YYYY |2017-10-25 14:50:38.321134||YYYY |2017-10-25 14:51:12.028447||ZZZZ |2017-10-25 14:51:24.810688||YYYY |2017-10-25 14:37:34.241097||ZZZZ |2017-10-25 14:37:24.427836||XXXX |2017-10-25 14:37:24.620864||YYYY |2017-10-25 14:37:24.964614|+---------+--------------------------+

我想计算每个 group_id 一天内每小时的事件滚动计数.

因此,对于日期时间 25-10 14:00group_id,我想计算该 group_id 的事件计数> 从 25-10 00:0025-10 14:00.

执行以下操作:

df.groupBy('group_id', window('event_time', '1 hours').alias('model_window')) \.agg(dfcount(lit(1)).alias('values'))

计算每小时的事件数,但不计算每天的累计数.

有什么想法吗?

编辑:预期的输出将类似于:

 +---------+-------------------------------------+-------+|group_id |model_window |值 |+---------+---------------------------------------------+-------+|XXXX |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]|10 ||XXXX |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]|17 ||XXXX |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]|22 ||YYYY |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]|0 ||YYYY |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]|1 ||YYYY |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]|9 |+---------+---------------------------------------------+-------+

解决方案

想要计算...在一天内每个 group_id 的每小时.

提取数据和小时:

from pyspark.sql.functions import col, count, hours, sum扩展 = (df.withColumn("event_time", col("event_time").cast("timestamp")).withColumn("date", col("event_time").cast("date")).withColumn("小时", 小时(col("event_time"))))

计算聚合

aggs = extended.groupBy("group_id", "date", "hour").count()

<块引用>

我想计算事件的滚动计数

并使用窗口函数:

from pyspark.sql.window 导入窗口aggs.withColumn("agg_count",sum("count").over(Window.partitionBy("group_id", "date").orderBy("hour")))

要为缺失的间隔获得 0,您必须为每个日期和小时生成参考数据并将其加入.

其中 df 定义为:

df = sc.parallelize([("XXXX", "2017-10-25 01:47:02.717013"),(XXXX",2017-10-25 14:47:25.444979"),(XXXX",2017-10-25 14:49:32.21353"),("YYYY", "2017-10-25 14:50:38.321134"),("YYYY", "2017-10-25 14:51:12.028447"),("ZZZZ", "2017-10-25 14:51:24.810688"),("YYYY", "2017-10-25 14:37:34.241097"),("ZZZZ", "2017-10-25 14:37:24.427836"),(XXXX",2017-10-25 22:37:24.620864"),(YYYY",2017-10-25 16:37:24.964614")]).toDF(["group_id", "event_time"])

结果是

+--------+----------+----+-----+---------+|group_id|日期|小时|计数|agg_count|+--------+-----------+----+-----+---------+|XXXX|2017-10-25|1|1|1||XXXX|2017-10-25|14|2|3||XXXX|2017-10-25|22|1|4||ZZZZ|2017-10-25|14|2|2||YYYY|2017-10-25|14|3|3||YYYY|2017-10-25|16|1|4|+--------+-----------+----+-----+---------+

I have a Spark DataFrame like the one below:

+---------+--------------------------+
|group_id |event_time                |
+---------+--------------------------+
|XXXX     |2017-10-25 14:47:02.717013|
|XXXX     |2017-10-25 14:47:25.444979|
|XXXX     |2017-10-25 14:49:32.21353 |
|YYYY     |2017-10-25 14:50:38.321134|
|YYYY     |2017-10-25 14:51:12.028447|
|ZZZZ     |2017-10-25 14:51:24.810688|
|YYYY     |2017-10-25 14:37:34.241097|
|ZZZZ     |2017-10-25 14:37:24.427836|
|XXXX     |2017-10-25 14:37:24.620864|
|YYYY     |2017-10-25 14:37:24.964614|
+---------+--------------------------+

I want to calculate the rolling count of events per hour within a day per group_id.

So, for the datetime 25-10 14:00 and for a group_id, I want to calculate the count of events for that group_id from 25-10 00:00 till 25-10 14:00.

Doing something like below:

df.groupBy('group_id', window('event_time', '1 hour').alias('model_window')) \
    .agg(dfcount(lit(1)).alias('values'))

calculates the count of events per hour, but not cumulatively during each day.

Any ideas?

EDIT: The expected output would be something like:

    +---------+---------------------------------------------+-------+
    |group_id |model_window                                 |values |         
    +---------+---------------------------------------------+-------+
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 10    |
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 17    |
    |XXXX     |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 22    |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 01:00:00.0]| 0     |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 02:00:00.0]| 1     |
    |YYYY     |[2017-10-25 00:00:00.0,2017-10-25 03:00:00.0]| 9     |
    +---------+---------------------------------------------+-------+

解决方案

want to calculate ... per hour within a day per group_id.

Extract data and hour:

from pyspark.sql.functions import col, count, hour, sum

extended = (df
  .withColumn("event_time", col("event_time").cast("timestamp"))
  .withColumn("date", col("event_time").cast("date"))
  .withColumn("hour", hour(col("event_time"))))

Compute aggregates

aggs = extended.groupBy("group_id", "date", "hour").count()

I want to calculate the rolling count of events

And use window functions:

from pyspark.sql.window import Window

aggs.withColumn(
    "agg_count", 
    sum("count").over(Window.partitionBy("group_id", "date").orderBy("hour")))

To get 0 for missing intervals you'll have to generate reference data for each date and hour and join it.

With df defined as:

df = sc.parallelize([
    ("XXXX", "2017-10-25 01:47:02.717013"),
    ("XXXX", "2017-10-25 14:47:25.444979"),
    ("XXXX", "2017-10-25 14:49:32.21353"),
    ("YYYY", "2017-10-25 14:50:38.321134"),
    ("YYYY", "2017-10-25 14:51:12.028447"),
    ("ZZZZ", "2017-10-25 14:51:24.810688"),
    ("YYYY", "2017-10-25 14:37:34.241097"),
    ("ZZZZ", "2017-10-25 14:37:24.427836"),
    ("XXXX", "2017-10-25 22:37:24.620864"),
    ("YYYY", "2017-10-25 16:37:24.964614")
]).toDF(["group_id", "event_time"])

the result is

+--------+----------+----+-----+---------+                                      
|group_id|      date|hour|count|agg_count|
+--------+----------+----+-----+---------+
|    XXXX|2017-10-25|   1|    1|        1|
|    XXXX|2017-10-25|  14|    2|        3|
|    XXXX|2017-10-25|  22|    1|        4|
|    ZZZZ|2017-10-25|  14|    2|        2|
|    YYYY|2017-10-25|  14|    3|        3|
|    YYYY|2017-10-25|  16|    1|        4|
+--------+----------+----+-----+---------+

这篇关于如何在 PySpark 的一天内累计聚合超过“1 小时"的窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆