按时间间隔按 Pyspark 数据帧分组 [英] Group by a Pyspark Dataframe by time interval
本文介绍了按时间间隔按 Pyspark 数据帧分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个带有 timestamps
的数据框:
I have a data frame with timestamps
generated for it:
from pyspark.sql.functions import avg, first
rdd = sc.parallelize(
[
(0, "A", 223,"201603_170302", "PORT"),
(0, "A", 22,"201602_100302", "PORT"),
(0, "A", 422,"201601_114300", "DOCK"),
(1,"B", 3213,"201602_121302", "DOCK")
]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])
所以我可以生成一个datetime
:
dt_parse = udf(lambda x: datetime.strptime(x,"%Y%m%d_%H%M%S")
df_data = df_data.withColumn('datetime', dt_parse(df_data.date))
但现在我需要按每天 6 小时的间隔进行分组.每小时将是
But now I need to group by intervals of 6 hours, per day. Per hour would be something on the lines of
df_data.groupby(hour(df_data.datetime)).agg(count(ship).alias(ship)).show()
但这不适用于小时以外的其他时间间隔.有办法吗?
But this wouldn't work for other intervals than hour. Is there a way to do it?
推荐答案
这对我有用.
import pyspark.sql.functions
# ...
interval = 60 * 60 * 6 # 6 hours
gdf = dataframe.withColumn(
'time_interval',
pyspark.sql.functions.from_unixtime(pyspark.sql.functions.floor(pyspark.sql.functions.unix_timestamp(dataframe[obj['field']]) / interval) * interval)
).groupBy('time_interval')
# and then something like gdf.agg(...); gdf.collect()
这篇关于按时间间隔按 Pyspark 数据帧分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文