按 24 小时进行分区并使用 pyspark 或 panda 进行聚合 [英] partition by 24 hours and aggregate using pyspark or panda

查看:26
本文介绍了按 24 小时进行分区并使用 pyspark 或 panda 进行聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有每个设备的会话数据,如下所示

I am having a session data for each device as below

time-started:设备连接时的时间戳

time-started: timestamp when device connected

持续时间(秒):连接的时间

duration (seconds): how much time it connected

packets:发送了多少数据包

packets: how much packets sent

现在我需要按 24 小时聚合(求和)每个设备分区的持续时间和数据包.

now i need to aggregate(sum) duration and packet for each device partition by 24 hours.

例如:

第一条记录,

设备 A,时间从 4 月 8 日凌晨 1 点 53 分开始,因此需要汇总所有在 24 小时内有效的设备 (A),直到 4 月 9 日凌晨 1 点 53 分.

Device A, time-started at 8thApril 1:53 AM so need to aggregate all the device(A) valid for 24 hours that is up to 9thApril 1:53 AM.

然后设备 A 的下一条记录应从 4 月 9 日凌晨 4:27 开始,因此需要汇总所有设备 (A),有效期为 24 小时,直到 4 月 10 日凌晨 4:27.

Then next record for device A, should start from 9thApril 4:27 AM so need to aggregate all the device(A) valid for 24 hours that is up to 10thApril 4:27 AM.

对每个设备依此类推.

预期输出

测试数据:

dftest = sc.parallelize([['A','2020-04-08T01:53:54.932000','Org1','wifi',60,372717],
                      ['A','2020-04-08T02:40:38.661000','Org1','wifi',194,819040],
                       ['A','2020-04-08T21:45:10.207000','Org1','wifi',8885,3449150],
                        ['A','2020-04-09T00:15:28.838000','Org1','wifi',14770,3572589],
                         ['A','2020-04-09T04:27:33.424000','Org1','remote',0,0],
                          ['A','2020-04-09T04:29:25.189000','Org1','wifi',60,7495],
                           ['A','2020-04-09T04:44:21.397000','Org1','remote',60,553356],
                            ['A','2020-04-09T04:50:40.406000','Org1','wifi',60,662467],
                             ['A','2020-04-10T00:00:50.636000','Org1','remote',0,72],
                              ['A','2020-04-10T04:20:28.831000','Org1','remote',6,497],
                               ['A','2020-04-10T04:31:35.336000','Org1','remote',0,22],
                                ['B','2020-04-08T21:56:58.502000','Org2','remote',0,0],
                                 ['B','2020-04-08T22:01:19.534000','Org2','wifi',0,0],
                                  ['B','2020-04-08T22:10:15.891000','Org2','wifi',60,187891],
                                   ['B','2020-04-08T22:16:41.499000','Org2','wifi',1620,207674],
                                    ['B','2020-04-09T01:55:02.772000','Org2','wifi',360,426232],
                                     ['B','2020-04-09T02:03:32.735000','Org2','wifi',60,374827],
                                      ['B','2020-04-09T02:06:16.509000','Org2','wifi',60,386518],
                                       ['B','2020-04-09T02:13:33.497000','Org2','remote',60,373609],
                                        ['B','2020-04-09T02:17:19.176000','Org2','wifi',133,400417],
                                         ['B','2020-04-10T23:10:15.654000','Org2','remote',0,212],
                                          ['B','2020-04-10T23:10:41.749000','Org2','remote',1,285]
                    ]).toDF(("deviceId","time-started","OrgId","type","duration","packet"))
dftest.show()

推荐答案

对于您的情况,您的下一个 24 小时 取决于 最后一个,以及最后一个日期之后的日期,因此我们不能只用窗口函数来表达这个逻辑.我解耦从pandas计算总和(因为它会很慢)并使用spark内置函数来计算总和,pandas udaf 基本上给了我们所需的日期组,我们对它们filter 得到想要的结果.

For your case, your next 24 hour depends on the end of the last one, and the date after that last date therefore we can't express this logic with only window functions. I decoupled the sum computation from pandas(as it would be slow) and used spark in-built functions to get your sum, and pandas udaf basically gives us our desired date groups, and we filter on them to get desired result.

迭代是获取24 小时分段的唯一方法,因此您也可以使用简单的 udf,但 Pandas 向量化 udaf 允许我们在小组中表达我们的逻辑(基于id) 因此应该更好. Pandas-udaf(spark2.3+)

Iteration is the only way to get your 24hour segments, so you can use a simple udf too, but the pandas vectorized udaf allows us to express our logic on small groups(based on id) therefore it should be better. Pandas-udaf(spark2.3+)

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("deviceId").orderBy(F.col("time-started").cast("long")).rangeBetween(Window.currentRow,24*60*60)
df2=df.withColumn("time-started", F.to_timestamp("time-started", "yyyy-MM-dd'T'HH:mm:ss"))\
      .withColumn("time-started-2", F.col("time-started"))\
      .withColumn("duration", F.sum("duration").over(w))\
      .withColumn("packet", F.sum("packet").over(w))

@pandas_udf(df2.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
   start=df1.loc[0, 'time-started']
   for i in range(1, len(df1)):
        if start + pd.Timedelta(days=1)>df1.loc[i,'time-started']:
             df1.loc[i,'time-started']=start
        else:
             start=df1.loc[i,'time-started']    


   return df1
df2.groupby('deviceId').apply(grouped_map)\
.filter(F.col("time-started-2")==F.col("time-started"))\
.drop("time-started-2")\
.orderBy("deviceId")\
.show()

#+--------+-------------------+-----+------+--------+-------+
#|deviceId|       time-started|OrgId|  type|duration| packet|
#+--------+-------------------+-----+------+--------+-------+
#|       A|2020-04-08 01:53:54| Org1|  wifi|   23909|8213496|
#|       A|2020-04-09 04:27:33| Org1|remote|     186|1223887|
#|       A|2020-04-10 04:31:35| Org1|remote|       0|     22|
#|       B|2020-04-08 21:56:58| Org2|remote|    2353|2357168|
#|       B|2020-04-10 23:10:15| Org2|remote|       1|    497|
#+--------+-------------------+-----+------+--------+-------+

您还可以查看 类似问题.建议的解决方案是使用 foldleft 功能的 scala udf.我认为熊猫分组地图 udaf 会是一个更好的选择.

You can also take a look at a similar question. The proposed solution there was a scala udf using foldleft feature. I think pandas grouped map udaf would be a better alternative.

这篇关于按 24 小时进行分区并使用 pyspark 或 panda 进行聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆