如何在Spark SQL中按时间间隔分组 [英] How to group by time interval in Spark SQL
问题描述
我的数据集如下:
KEY |Event_Type | metric | Time
001 |event1 | 10 | 2016-05-01 10:50:51
002 |event2 | 100 | 2016-05-01 10:50:53
001 |event3 | 20 | 2016-05-01 10:50:55
001 |event1 | 15 | 2016-05-01 10:51:50
003 |event1 | 13 | 2016-05-01 10:55:30
001 |event2 | 12 | 2016-05-01 10:57:00
001 |event3 | 11 | 2016-05-01 11:00:01
我想在验证此键的时候得到所有信息:
I want to get all when the keys that verify this:
特定事件的指标总和" > 阈值,持续时间为 5分钟.
"SUM of metric for a specific event" > threshold during 5 minutes .
在我看来,这是使用滑动Windows功能的理想选择.
This appear to me a perfect candidate for using the Sliding Windows Functions .
如何使用Spark SQL执行此操作?
How can I do this with Spark SQL ?
谢谢.
推荐答案
火花> = 2.0
You can use window
(not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:
df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")
// +---+---------------------------------------------+-----------+
// |KEY|window |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45 |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12 |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13 |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11 |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100 |
// +---+---------------------------------------------+-----------+
火花< 2.0
让我们以示例数据开头:
Lets start with example data:
import spark.implicits._ // import sqlContext.implicits._ in Spark < 2.0
val df = Seq(
("001", "event1", 10, "2016-05-01 10:50:51"),
("002", "event2", 100, "2016-05-01 10:50:53"),
("001", "event3", 20, "2016-05-01 10:50:55"),
("001", "event1", 15, "2016-05-01 10:51:50"),
("003", "event1", 13, "2016-05-01 10:55:30"),
("001", "event2", 12, "2016-05-01 10:57:00"),
("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")
我假定事件由KEY
标识.如果不是这种情况,则可以根据需要调整GROUP BY
/PARTITION BY
子句.
I assume that event is identified by KEY
. If this is not the case you can adjust GROUP BY
/ PARTITION BY
clauses according to your requirements.
如果您对静态窗口独立于数据的聚合感兴趣,请将时间戳转换为数字数据类型并四舍五入
If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round
import org.apache.spark.sql.functions.{round, sum}
// cast string to timestamp
val ts = $"Time".cast("timestamp").cast("long")
// Round to 300 seconds interval
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")
df.groupBy($"KEY", interval).sum("metric")
// +---+---------------------+-----------+
// |KEY|interval |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11 |
// |001|2016-05-01 10:55:00.0|12 |
// |001|2016-05-01 10:50:00.0|45 |
// |003|2016-05-01 10:55:00.0|13 |
// |002|2016-05-01 10:50:00.0|100 |
// +---+---------------------+-----------+
如果您对相对于当前行的窗口感兴趣,请使用窗口功能:
If you're interested in a window relative to the current row use window functions:
import org.apache.spark.sql.expressions.Window
// Partition by KEY
// Order by timestamp
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))
// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time |ts |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1 |13 |2016-05-01 10:55:30|1462092930|13 |
// |001|event1 |10 |2016-05-01 10:50:51|1462092651|45 |
// |001|event3 |20 |2016-05-01 10:50:55|1462092655|45 |
// |001|event1 |15 |2016-05-01 10:51:50|1462092710|45 |
// |001|event2 |12 |2016-05-01 10:57:00|1462093020|12 |
// |001|event3 |11 |2016-05-01 11:00:01|1462093201|11 |
// |002|event2 |100 |2016-05-01 10:50:53|1462092653|100 |
// +---+----------+------+-------------------+----------+----------+
出于性能原因,仅当数据可以划分为多个单独的组时,此方法才有用.在Spark中< 2.0.0您还需要HiveContext
才能使其正常工作.
For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext
to make it work.
这篇关于如何在Spark SQL中按时间间隔分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!