按24小时划分并使用pyspark或panda进行聚合 [英] partition by 24 hours and aggregate using pyspark or panda

查看:53
本文介绍了按24小时划分并使用pyspark或panda进行聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在每个设备上都有如下会话数据

I am having a session data for each device as below

时间开始:设备连接时的时间戳

time-started: timestamp when device connected

持续时间(秒):连接了多少时间

duration (seconds): how much time it connected

数据包:发送了多少数据包

packets: how much packets sent

现在我需要在24小时之前汇总每个设备分区的持续时间和数据包.

now i need to aggregate(sum) duration and packet for each device partition by 24 hours.

例如:

第一个记录,

设备A,时间从4月8日1:53 AM开始,因此需要汇总在24小时内有效的所有设备(A),直到4月9日1:53 AM.

Device A, time-started at 8thApril 1:53 AM so need to aggregate all the device(A) valid for 24 hours that is up to 9thApril 1:53 AM.

然后设备A的下一条记录应从4月9日4:27 AM开始,因此需要汇总在24小时内有效的所有设备(A),直到4月10日4:27 AM.

Then next record for device A, should start from 9thApril 4:27 AM so need to aggregate all the device(A) valid for 24 hours that is up to 10thApril 4:27 AM.

每个设备都这样.

预期产量

测试数据:

dftest = sc.parallelize([['A','2020-04-08T01:53:54.932000','Org1','wifi',60,372717],
                      ['A','2020-04-08T02:40:38.661000','Org1','wifi',194,819040],
                       ['A','2020-04-08T21:45:10.207000','Org1','wifi',8885,3449150],
                        ['A','2020-04-09T00:15:28.838000','Org1','wifi',14770,3572589],
                         ['A','2020-04-09T04:27:33.424000','Org1','remote',0,0],
                          ['A','2020-04-09T04:29:25.189000','Org1','wifi',60,7495],
                           ['A','2020-04-09T04:44:21.397000','Org1','remote',60,553356],
                            ['A','2020-04-09T04:50:40.406000','Org1','wifi',60,662467],
                             ['A','2020-04-10T00:00:50.636000','Org1','remote',0,72],
                              ['A','2020-04-10T04:20:28.831000','Org1','remote',6,497],
                               ['A','2020-04-10T04:31:35.336000','Org1','remote',0,22],
                                ['B','2020-04-08T21:56:58.502000','Org2','remote',0,0],
                                 ['B','2020-04-08T22:01:19.534000','Org2','wifi',0,0],
                                  ['B','2020-04-08T22:10:15.891000','Org2','wifi',60,187891],
                                   ['B','2020-04-08T22:16:41.499000','Org2','wifi',1620,207674],
                                    ['B','2020-04-09T01:55:02.772000','Org2','wifi',360,426232],
                                     ['B','2020-04-09T02:03:32.735000','Org2','wifi',60,374827],
                                      ['B','2020-04-09T02:06:16.509000','Org2','wifi',60,386518],
                                       ['B','2020-04-09T02:13:33.497000','Org2','remote',60,373609],
                                        ['B','2020-04-09T02:17:19.176000','Org2','wifi',133,400417],
                                         ['B','2020-04-10T23:10:15.654000','Org2','remote',0,212],
                                          ['B','2020-04-10T23:10:41.749000','Org2','remote',1,285]
                    ]).toDF(("deviceId","time-started","OrgId","type","duration","packet"))
dftest.show()

推荐答案

对于您的情况,下一个 24小时 取决于 结束日期最后一个 ,以及最后一个日期之后的 日期 ,因此我们不能仅使用窗口函数来表达此逻辑.我 去耦 从熊猫中求和(因为这很慢),并使用了 spark内置函数来获取您的总和 ,而熊猫udaf基本上为我们提供了 所需的日期组 ,我们对其进行了 过滤 预期的结果.

For your case, your next 24 hour depends on the end of the last one, and the date after that last date therefore we can't express this logic with only window functions. I decoupled the sum computation from pandas(as it would be slow) and used spark in-built functions to get your sum, and pandas udaf basically gives us our desired date groups, and we filter on them to get desired result.

迭代是获取 24小时细分的唯一方法,因此您也可以使用简单的udf,但是熊猫矢量化udaf允许我们在小组上表达我们的逻辑(基于id)因此应该会更好. Pandas-udaf(spark2.3 +)

Iteration is the only way to get your 24hour segments, so you can use a simple udf too, but the pandas vectorized udaf allows us to express our logic on small groups(based on id) therefore it should be better. Pandas-udaf(spark2.3+)

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("deviceId").orderBy(F.col("time-started").cast("long")).rangeBetween(Window.currentRow,24*60*60)
df2=df.withColumn("time-started", F.to_timestamp("time-started", "yyyy-MM-dd'T'HH:mm:ss"))\
      .withColumn("time-started-2", F.col("time-started"))\
      .withColumn("duration", F.sum("duration").over(w))\
      .withColumn("packet", F.sum("packet").over(w))

@pandas_udf(df2.schema, PandasUDFType.GROUPED_MAP)
def grouped_map(df1):
   start=df1.loc[0, 'time-started']
   for i in range(1, len(df1)):
        if start + pd.Timedelta(days=1)>df1.loc[i,'time-started']:
             df1.loc[i,'time-started']=start
        else:
             start=df1.loc[i,'time-started']    


   return df1
df2.groupby('deviceId').apply(grouped_map)\
.filter(F.col("time-started-2")==F.col("time-started"))\
.drop("time-started-2")\
.orderBy("deviceId")\
.show()

#+--------+-------------------+-----+------+--------+-------+
#|deviceId|       time-started|OrgId|  type|duration| packet|
#+--------+-------------------+-----+------+--------+-------+
#|       A|2020-04-08 01:53:54| Org1|  wifi|   23909|8213496|
#|       A|2020-04-09 04:27:33| Org1|remote|     186|1223887|
#|       A|2020-04-10 04:31:35| Org1|remote|       0|     22|
#|       B|2020-04-08 21:56:58| Org2|remote|    2353|2357168|
#|       B|2020-04-10 23:10:15| Org2|remote|       1|    497|
#+--------+-------------------+-----+------+--------+-------+

您还可以查看

You can also take a look at a similar question. The proposed solution there was a scala udf using foldleft feature. I think pandas grouped map udaf would be a better alternative.

这篇关于按24小时划分并使用pyspark或panda进行聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆