如何在 Spark SQL 中按时间间隔分组 [英] How to group by time interval in Spark SQL

查看:57
本文介绍了如何在 Spark SQL 中按时间间隔分组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的数据集如下所示:

KEY |Event_Type | metric | Time 
001 |event1     | 10     | 2016-05-01 10:50:51
002 |event2     | 100    | 2016-05-01 10:50:53
001 |event3     | 20     | 2016-05-01 10:50:55
001 |event1     | 15     | 2016-05-01 10:51:50
003 |event1     | 13     | 2016-05-01 10:55:30
001 |event2     | 12     | 2016-05-01 10:57:00
001 |event3     | 11     | 2016-05-01 11:00:01

我想在验证这一点的密钥时得到所有:

I want to get all when the keys that verify this:

特定事件的指标总和" > 5 分钟内的阈值.

在我看来,这是使用 滑动 Windows 函数 的完美选择.

This appear to me a perfect candidate for using the Sliding Windows Functions .

如何使用 Spark SQL 执行此操作?

How can I do this with Spark SQL ?

谢谢.

推荐答案

Spark >= 2.0

您可以使用 window(不要误认为是窗口函数).根据一种变体,它将时间戳分配给另外一个可能重叠的桶:

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window                                       |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |
// +---+---------------------------------------------+-----------+

火花<2.0

让我们从示例数据开始:

Lets start with example data:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
  ("001", "event1", 10, "2016-05-01 10:50:51"),
  ("002", "event2", 100, "2016-05-01 10:50:53"),
  ("001", "event3", 20, "2016-05-01 10:50:55"),
  ("001", "event1", 15, "2016-05-01 10:51:50"),
  ("003", "event1", 13, "2016-05-01 10:55:30"),
  ("001", "event2", 12, "2016-05-01 10:57:00"),
  ("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

我假设事件由 KEY 标识.如果不是这种情况,您可以根据您的要求调整 GROUP BY/PARTITION BY 子句.

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

如果您对具有独立于数据的静态窗口的聚合感兴趣,请将时间戳转换为数字数据类型并舍入

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with 
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval             |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11         |
// |001|2016-05-01 10:55:00.0|12         |
// |001|2016-05-01 10:50:00.0|45         |
// |003|2016-05-01 10:55:00.0|13         |
// |002|2016-05-01 10:50:00.0|100        |
// +---+---------------------+-----------+

如果您对与当前行相关的窗口感兴趣,请使用窗口函数:

If you're interested in a window relative to the current row use window functions:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time               |ts        |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        |
// |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        |
// |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        |
// |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        |
// |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        |
// |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        |
// |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       |
// +---+----------+------+-------------------+----------+----------+

出于性能原因,此方法仅在数据可以划分为多个单独的组时才有用.在 Spark <2.0.0 你还需要 HiveContext 让它工作.

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

这篇关于如何在 Spark SQL 中按时间间隔分组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆