在 PySpark Dataframe 中对连续行进行分组 [英] grouping consecutive rows in PySpark Dataframe
问题描述
我有以下示例 Spark DataFrame:
I have the following example Spark DataFrame:
rdd = sc.parallelize([(1,"19:00:00", "19:30:00", 30), (1,"19:30:00", "19:40:00", 10),(1,"19:40:00", "19:43:00", 3), (2,"20:00:00", "20:10:00", 10), (1,"20:05:00", "20:15:00", 10),(1,"20:15:00", "20:35:00", 20)])
df = spark.createDataFrame(rdd, ["user_id", "start_time", "end_time", "duration"])
df.show()
+-------+----------+--------+--------+
|user_id|start_time|end_time|duration|
+-------+----------+--------+--------+
| 1| 19:00:00|19:30:00| 30|
| 1| 19:30:00|19:40:00| 10|
| 1| 19:40:00|19:43:00| 3|
| 2| 20:00:00|20:10:00| 10|
| 1| 20:05:00|20:15:00| 10|
| 1| 20:15:00|20:35:00| 20|
+-------+----------+--------+--------+
我想根据开始和结束时间对连续行进行分组.例如,对于相同的user_id,如果一行的开始时间与前一行的结束时间相同,我想将它们组合在一起并将持续时间相加.
I want to group consecutive rows based on the start and end times. For instance, for the same user_id, if a row's start time is the same as the previous row's end time, I want to group them together and sum the duration.
想要的结果是:
+-------+----------+--------+--------+
|user_id|start_time|end_time|duration|
+-------+----------+--------+--------+
| 1| 19:00:00|19:43:00| 43|
| 2| 20:00:00|20:10:00| 10|
| 1| 20:05:00|20:35:00| 30|
+-------+----------+--------+--------+
数据帧的前三行被组合在一起,因为它们都对应于 user_id 1,并且开始时间和结束时间形成一个连续的时间线.
The first three rows of the dataframe were grouped together because they all correspond to user_id 1 and the start times and end times form a continuous timeline.
这是我最初的方法:
使用滞后函数获取下次开始时间:
Use the lag function to get the next start time:
from pyspark.sql.functions import *
from pyspark.sql import Window
import sys
# compute next start time
window = Window.partitionBy('user_id').orderBy('start_time')
df = df.withColumn("next_start_time", lag(df.start_time, -1).over(window))
df.show()
+-------+----------+--------+--------+---------------+
|user_id|start_time|end_time|duration|next_start_time|
+-------+----------+--------+--------+---------------+
| 1| 19:00:00|19:30:00| 30| 19:30:00|
| 1| 19:30:00|19:40:00| 10| 19:40:00|
| 1| 19:40:00|19:43:00| 3| 20:05:00|
| 1| 20:05:00|20:15:00| 10| 20:15:00|
| 1| 20:15:00|20:35:00| 20| null|
| 2| 20:00:00|20:10:00| 10| null|
+-------+----------+--------+--------+---------------+
获取当前行结束时间与下一行开始时间的差值:
get the difference between the current row's end time and the next row's start time:
time_fmt = "HH:mm:ss"
timeDiff = unix_timestamp('next_start_time', format=time_fmt) - unix_timestamp('end_time', format=time_fmt)
df = df.withColumn("difference", timeDiff)
df.show()
+-------+----------+--------+--------+---------------+----------+
|user_id|start_time|end_time|duration|next_start_time|difference|
+-------+----------+--------+--------+---------------+----------+
| 1| 19:00:00|19:30:00| 30| 19:30:00| 0|
| 1| 19:30:00|19:40:00| 10| 19:40:00| 0|
| 1| 19:40:00|19:43:00| 3| 20:05:00| 1320|
| 1| 20:05:00|20:15:00| 10| 20:15:00| 0|
| 1| 20:15:00|20:35:00| 20| null| null|
| 2| 20:00:00|20:10:00| 10| null| null|
+-------+----------+--------+--------+---------------+----------+
现在我的想法是使用带有窗口的 sum 函数来获取持续时间的累积总和,然后进行 groupBy.但我的方法有缺陷,原因有很多.
Now my idea was to use the sum function with a window to get the cumulative sum of duration and then do a groupBy. But my approach was flawed for many reasons.
推荐答案
这是一种方法:
将行收集到组中,其中一组是一组具有相同 user_id
且连续的行(start_time
与之前的 end_time
匹配).然后你可以使用这个 group
来做你的聚合.
Gather together rows into groups where a group is a set of rows with the same user_id
that are consecutive (start_time
matches previous end_time
). Then you can use this group
to do your aggregation.
到达这里的一种方法是创建中间指标列来告诉您用户是否已更改或时间是否不连续.然后对指标列执行累积求和以创建 group
.
A way to get here is by creating intermediate indicator columns to tell you if the user has changed or the time is not consecutive. Then perform a cumulative sum over the indicator column to create the group
.
例如:
import pyspark.sql.functions as f
from pyspark.sql import Window
w1 = Window.orderBy("start_time")
df = df.withColumn(
"userChange",
(f.col("user_id") != f.lag("user_id").over(w1)).cast("int")
)\
.withColumn(
"timeChange",
(f.col("start_time") != f.lag("end_time").over(w1)).cast("int")
)\
.fillna(
0,
subset=["userChange", "timeChange"]
)\
.withColumn(
"indicator",
(~((f.col("userChange") == 0) & (f.col("timeChange")==0))).cast("int")
)\
.withColumn(
"group",
f.sum(f.col("indicator")).over(w1.rangeBetween(Window.unboundedPreceding, 0))
)
df.show()
#+-------+----------+--------+--------+----------+----------+---------+-----+
#|user_id|start_time|end_time|duration|userChange|timeChange|indicator|group|
#+-------+----------+--------+--------+----------+----------+---------+-----+
#| 1| 19:00:00|19:30:00| 30| 0| 0| 0| 0|
#| 1| 19:30:00|19:40:00| 10| 0| 0| 0| 0|
#| 1| 19:40:00|19:43:00| 3| 0| 0| 0| 0|
#| 2| 20:00:00|20:10:00| 10| 1| 1| 1| 1|
#| 1| 20:05:00|20:15:00| 10| 1| 1| 1| 2|
#| 1| 20:15:00|20:35:00| 20| 0| 0| 0| 2|
#+-------+----------+--------+--------+----------+----------+---------+-----+
现在我们有了 group
列,我们可以按如下方式聚合以获得所需的结果:
Now that we have the group
column, we can aggregate as follows to get the desired result:
df.groupBy("user_id", "group")\
.agg(
f.min("start_time").alias("start_time"),
f.max("end_time").alias("end_time"),
f.sum("duration").alias("duration")
)\
.drop("group")\
.show()
#+-------+----------+--------+--------+
#|user_id|start_time|end_time|duration|
#+-------+----------+--------+--------+
#| 1| 19:00:00|19:43:00| 43|
#| 1| 20:05:00|20:35:00| 30|
#| 2| 20:00:00|20:10:00| 10|
#+-------+----------+--------+--------+
这篇关于在 PySpark Dataframe 中对连续行进行分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!