PySpark中的每小时聚合 [英] Hourly Aggregation in PySpark

查看:89
本文介绍了PySpark中的每小时聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种按小时汇总我的数据的方法。我首先要在evtTime中仅保留几个小时。我的DataFrame看起来像这样:

I'm looking for a way to aggregate by hour my data. I want firstly to keep only hours in my evtTime. My DataFrame looks like this:

Row(access=u'WRITE', 
    agentHost=u'xxxxxx50.haas.xxxxxx', 
    cliIP=u'192.000.00.000', 
    enforcer=u'ranger-acl', 
    event_count=1, 
    event_dur_ms=0, 
    evtTime=u'2017-10-01 23:03:51.337', 
    id=u'a43d824c-1e53-439b-b374-96b76bacf714', 
    logType=u'RangerAudit', 
    policy=699, 
    reason=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait', 
    repoType=1, 
    reqUser=u'rocqphadm', 
    resType=u'path', 
    resource=u'/project-h/xxxx/xxxx/warehouse/rocq.db/f_crcm_res_temps_retrait', 
    result=1, 
    seq_num=342976577) 

我的对象随后是按reqUser分组并计算event_count的总和。我试过了:

My objectif subsequently is to group by reqUser and calculate the sum of event_count. I tried this :

func =  udf (lambda x: datetime.datetime.strptime(x, '%Y-%m-%d %H:%M:%S.%f'), DateType())
df1 = df.withColumn('DATE', func(col('evtTime')))

metrics_DataFrame = (df1
                 .groupBy(hour('DATE'), 'reqUser')
                 .agg({'event_count': 'sum'})
                )

这是结果:

[Row(hour(DATE)=0, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=0, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=0, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=0, reqUser=u'adm', sum(event_count)=7608),
Row(hour(DATE)=0, reqUser=u'X165473', sum(event_count)=2)]

我的目标是得到这样的东西:

My objectif is to get something like this :

[Row(hour(DATE)=2017-10-01 23:00:00, reqUser=u'A383914', sum(event_count)=12114),
Row(hour(DATE)=2017-10-01 22:00:00, reqUser=u'xxxxadm', sum(event_count)=211631),
Row(hour(DATE)=2017-10-01 08:00:00, reqUser=u'splunk-system-user', sum(event_count)=48),
Row(hour(DATE)=2017-10-01 03:00:00, reqUser=u'adm', sum(event_count)=7608),
Row(hour(DATE)=2017-10-01 11:00:00, reqUser=u'X165473', sum(event_count)=2)]


推荐答案

可能有多种解决方案,最简单的解决方案是仅将必需的部分用作字符串:

There are multiple possible solutions, the simplest one is to use only the required part as a string:

from pyspark.sql.functions import substring, to_timestamp

df = spark.createDataFrame(["2017-10-01 23:03:51.337"], "string").toDF("evtTime")

df.withColumn("hour", substring("evtTime", 0, 13)).show()
# +--------------------+-------------+                                            
# |             evtTime|         hour|
# +--------------------+-------------+
# |2017-10-01 23:03:...|2017-10-01 23|
# +--------------------+-------------+

或作为时间戳:

df.withColumn("hour", to_timestamp(substring("evtTime", 0, 13), "yyyy-MM-dd HH")).show()
# +--------------------+-------------------+
# |             evtTime|               hour|
# +--------------------+-------------------+
# |2017-10-01 23:03:...|2017-10-01 23:00:00|
# +--------------------+-------------------+

您还可以 date_format

from pyspark.sql.functions import date_format, col

df.withColumn("hour", date_format(col("evtTime").cast("timestamp"), "yyyy-MM-dd HH:00")).show()
# +--------------------+----------------+
# |             evtTime|            hour|
# +--------------------+----------------+
# |2017-10-01 23:03:...|2017-10-01 23:00|
# +--------------------+----------------+

date_trunc

from pyspark.sql.functions import date_trunc

df.withColumn("hour", date_trunc("hour", col("evtTime").cast("timestamp"))).show()
# +--------------------+-------------------+                                      
# |             evtTime|               hour|
# +--------------------+-------------------+
# |2017-10-01 23:03:...|2017-10-01 23:00:00|
# +--------------------+-------------------+

这篇关于PySpark中的每小时聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆