带有动态最后的 Spark 高级窗口 [英] Spark Advanced Window with dynamic last

查看:24
本文介绍了带有动态最后的 Spark 高级窗口的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

问题:给定一个时间序列数据,它是存储在 hive 中的用户活动的点击流,ask 是使用 spark 用 session id 丰富数据.

会话定义

  • 会话在 1 小时不活动后到期
  • 会话在总共 2 小时内保持活动状态

数据:

click_time,user_id2018-01-01 11:00:00,u12018-01-01 12:10:00,u12018-01-01 13:00:00,u12018-01-01 13:50:00,u12018-01-01 14:40:00,u12018-01-01 15:30:00,u12018-01-01 16:20:00,u12018-01-01 16:50:00,u12018-01-01 11:00:00,u22018-01-02 11:00:00,u2

以下是仅考虑会话定义中的第一点的部分解决方案:

val win1 = Window.partitionBy("user_id").orderBy("click_time")val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >=60, 1).否则(0)用户活动.withColumn("session_num",sum(sessionnew).over(win1)).withColumn("session_id",concat($"user_id", $"session_num")).show(truncate = false)

实际输出:

+---------------------+-------+-----------+----------+|click_time |user_id|session_num|session_id|+-------------+-------+------------+----------+|2018-01-01 11:00:00.0|u1 |1 |u11 ||2018-01-01 12:10:00.0|u1 |2 |u12 |-- 会话 u12 开始|2018-01-01 13:00:00.0|u1 |2 |u12 ||2018-01-01 13:50:00.0|u1 |2 |u12 ||2018-01-01 14:40:00.0|u1 |2 |u12 |-- 这应该是一个新的会话,因为 u12 的会话开始的差异并且这一行超过了 2 小时|2018-01-01 15:30:00.0|u1 |2 |u12 ||2018-01-01 16:20:00.0|u1 |2 |u12 ||2018-01-01 16:50:00.0|u1 |2 |u12 |- 现在必须将其与第 5 行进行比较才能找到差异|2018-01-01 11:00:00.0|u2 |1 |u21 ||2018-01-02 11:00:00.0|u2 |2 |u22 |+-------------+-------+------------+----------+

为了包含第二个条件,我试图找出当前时间与上次会话开始时间之间的差异,以检查是否超过 2 小时,但随后的行的引用本身发生了变化.这些是一些可以通过运行求和来实现的用例,但这不适合这里.

解决方案

不是一个直接需要解决的问题,但这里有一个方法:

  1. 使用窗口 lag 时间戳差异来识别每个用户的 rule #1 会话(0 = 会话开始)
  2. 将数据集分组以组装每个用户的时间戳差异列表
  3. 通过 UDF 处理时间戳差异列表以识别 rule #2 的会话并为每个用户创建所有会话 ID
  4. 通过 Spark 的 explode
  5. 扩展分组数据集

示例代码如下:

import org.apache.spark.sql.functions._导入 org.apache.spark.sql.expressions.Window导入 spark.implicits._val userActivity = Seq(("2018-01-01 11:00:00", "u1"),("2018-01-01 12:10:00", "u1"),("2018-01-01 13:00:00", "u1"),("2018-01-01 13:50:00", "u1"),("2018-01-01 14:40:00", "u1"),("2018-01-01 15:30:00", "u1"),("2018-01-01 16:20:00", "u1"),("2018-01-01 16:50:00", "u1"),("2018-01-01 11:00:00", "u2"),("2018-01-02 11:00:00", "u2")).toDF("click_time", "user_id")def clickSessList(tmo: Long) = udf{ (uid: String, clickList: Seq[String], tsList: Seq[Long]) =>def sid(n: Long) = s"$uid-$n"val sessList = tsList.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>if (i == 0 || j + i >= tmo) (sid(k + 1) :: ls, 0L, k + 1) else(sid(k) :: ls, j + i, k)}._1.反向clickList zip sessList}

注意UDF中foldLeft的累加器是一个(ls, j, k)的元组,其中:

  • ls 是要返回的格式化会话 ID 列表
  • jk 分别用于将有条件变化的时间戳值和会话 id 编号带入下一次迭代

步骤1:

val tmo1: Long = 60 * 60val tmo2:长 = 2 * 60 * 60val win1 = Window.partitionBy("user_id").orderBy("click_time")val df1 = 用户活动.withColumn("ts_diff", unix_timestamp($"click_time") - unix_timestamp(滞后($"click_time", 1).over(win1))).withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).否则($ts_diff"))df1.show//+--------------------+-------+-------+//|click_time|user_id|ts_diff|//+--------------------+-------+-------+//|2018-01-01 11:00:00|u1|0|//|2018-01-01 12:10:00|u1|0|//|2018-01-01 13:00:00|u1|3000|//|2018-01-01 13:50:00|u1|3000|//|2018-01-01 14:40:00|u1|3000|//|2018-01-01 15:30:00|u1|3000|//|2018-01-01 16:20:00|u1|3000|//|2018-01-01 16:50:00|u1|1800|//|2018-01-01 11:00:00|u2|0|//|2018-01-02 11:00:00|u2|0|//+--------------------+-------+-------+

步骤2-4:

val df2 = df1.groupBy("user_id").agg(collect_list($"click_time").as("click_list"), collect_list($"ts_diff").as("ts_list")).withColumn("click_sess_id",爆炸(clickSessList(tmo2)($user_id",$click_list",$ts_list"))).select($"user_id", $"click_sess_id._1".as("click_time"), $"click_sess_id._2".as("sess_id"))df2.show//+-------+--------------------+-------+//|user_id|click_time |sess_id|//+-------+--------------------+-------+//|u1 |2018-01-01 11:00:00|u1-1 |//|u1 |2018-01-01 12:10:00|u1-2 |//|u1 |2018-01-01 13:00:00|u1-2 |//|u1 |2018-01-01 13:50:00|u1-2 |//|u1 |2018-01-01 14:40:00|u1-3 |//|u1 |2018-01-01 15:30:00|u1-3 |//|u1 |2018-01-01 16:20:00|u1-3 |//|u1 |2018-01-01 16:50:00|u1-4 |//|u2 |2018-01-01 11:00:00|u2-1 |//|u2 |2018-01-02 11:00:00|u2-2 |//+-------+--------------------+-------+

另请注意,click_time 在步骤 2-4 中通过",以便包含在最终数据集中.>

Problem: Given a time series data which is a clickstream of user activity is stored in hive, ask is to enrich the data with session id using spark.

Session Definition

  • Session expires after inactivity of 1 hour
  • Session remains active for a total duration of 2 hours

Data:

click_time,user_id
2018-01-01 11:00:00,u1
2018-01-01 12:10:00,u1
2018-01-01 13:00:00,u1
2018-01-01 13:50:00,u1
2018-01-01 14:40:00,u1
2018-01-01 15:30:00,u1
2018-01-01 16:20:00,u1
2018-01-01 16:50:00,u1
2018-01-01 11:00:00,u2
2018-01-02 11:00:00,u2

Below is partial solution considering only 1st point in session definition:

val win1 = Window.partitionBy("user_id").orderBy("click_time")
    val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >= 60, 1).otherwise(0)
    userActivity
      .withColumn("session_num",sum(sessionnew).over(win1))
      .withColumn("session_id",concat($"user_id", $"session_num"))
      .show(truncate = false)

Actual Output:

+---------------------+-------+-----------+----------+
|click_time           |user_id|session_num|session_id|
+---------------------+-------+-----------+----------+
|2018-01-01 11:00:00.0|u1     |1          |u11       |
|2018-01-01 12:10:00.0|u1     |2          |u12       | -- session u12 starts
|2018-01-01 13:00:00.0|u1     |2          |u12       |
|2018-01-01 13:50:00.0|u1     |2          |u12       |
|2018-01-01 14:40:00.0|u1     |2          |u12       | -- this should be a new session as diff of session start of u12 and this row exceeds 2 hours
|2018-01-01 15:30:00.0|u1     |2          |u12       |
|2018-01-01 16:20:00.0|u1     |2          |u12       |
|2018-01-01 16:50:00.0|u1     |2          |u12       | -- now this has to be compared with row 5 to find difference
|2018-01-01 11:00:00.0|u2     |1          |u21       |
|2018-01-02 11:00:00.0|u2     |2          |u22       |
+---------------------+-------+-----------+----------+

To include the second condition, I tried to find difference between the current time with last session start time to check if that exceeds 2 hours, but however the reference itself changes for the following rows. These are some some use cases which can be achieved through running sum but this doesn’t suit here.

解决方案

Not a straight forward problem to solve, but here's one approach:

  1. Use Window lag timestamp difference to identify sessions (with 0 = start of a session) per user for rule #1
  2. Group the dataset to assemble the timestamp diff list per user
  3. Process via a UDF the timestamp diff list to identify sessions for rule #2 and create all session ids per user
  4. Expand the grouped dataset via Spark's explode

Sample code below:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val userActivity = Seq(
  ("2018-01-01 11:00:00", "u1"),
  ("2018-01-01 12:10:00", "u1"),
  ("2018-01-01 13:00:00", "u1"),
  ("2018-01-01 13:50:00", "u1"),
  ("2018-01-01 14:40:00", "u1"),
  ("2018-01-01 15:30:00", "u1"),
  ("2018-01-01 16:20:00", "u1"),
  ("2018-01-01 16:50:00", "u1"),
  ("2018-01-01 11:00:00", "u2"),
  ("2018-01-02 11:00:00", "u2")
).toDF("click_time", "user_id")

def clickSessList(tmo: Long) = udf{ (uid: String, clickList: Seq[String], tsList: Seq[Long]) =>
  def sid(n: Long) = s"$uid-$n"

  val sessList = tsList.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>
    if (i == 0 || j + i >= tmo) (sid(k + 1) :: ls, 0L, k + 1) else
       (sid(k) :: ls, j + i, k)
  }._1.reverse

  clickList zip sessList
}

Note that the accumulator for foldLeft in the UDF is a Tuple of (ls, j, k), where:

  • ls is the list of formatted session ids to be returned
  • j and k are for carrying over the conditionally changing timestamp value and session id number, respectively, to the next iteration

Step 1:

val tmo1: Long = 60 * 60
val tmo2: Long = 2 * 60 * 60

val win1 = Window.partitionBy("user_id").orderBy("click_time")

val df1 = userActivity.
  withColumn("ts_diff", unix_timestamp($"click_time") - unix_timestamp(
    lag($"click_time", 1).over(win1))
  ).
  withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).
    otherwise($"ts_diff")
  )

df1.show
// +-------------------+-------+-------+
// |         click_time|user_id|ts_diff|
// +-------------------+-------+-------+
// |2018-01-01 11:00:00|     u1|      0|
// |2018-01-01 12:10:00|     u1|      0|
// |2018-01-01 13:00:00|     u1|   3000|
// |2018-01-01 13:50:00|     u1|   3000|
// |2018-01-01 14:40:00|     u1|   3000|
// |2018-01-01 15:30:00|     u1|   3000|
// |2018-01-01 16:20:00|     u1|   3000|
// |2018-01-01 16:50:00|     u1|   1800|
// |2018-01-01 11:00:00|     u2|      0|
// |2018-01-02 11:00:00|     u2|      0|
// +-------------------+-------+-------+

Steps 2-4:

val df2 = df1.
  groupBy("user_id").agg(
    collect_list($"click_time").as("click_list"), collect_list($"ts_diff").as("ts_list")
  ).
  withColumn("click_sess_id",
    explode(clickSessList(tmo2)($"user_id", $"click_list", $"ts_list"))
  ).
  select($"user_id", $"click_sess_id._1".as("click_time"), $"click_sess_id._2".as("sess_id"))

df2.show
// +-------+-------------------+-------+
// |user_id|click_time         |sess_id|
// +-------+-------------------+-------+
// |u1     |2018-01-01 11:00:00|u1-1   |
// |u1     |2018-01-01 12:10:00|u1-2   |
// |u1     |2018-01-01 13:00:00|u1-2   |
// |u1     |2018-01-01 13:50:00|u1-2   |
// |u1     |2018-01-01 14:40:00|u1-3   |
// |u1     |2018-01-01 15:30:00|u1-3   |
// |u1     |2018-01-01 16:20:00|u1-3   |
// |u1     |2018-01-01 16:50:00|u1-4   |
// |u2     |2018-01-01 11:00:00|u2-1   |
// |u2     |2018-01-02 11:00:00|u2-2   |
// +-------+-------------------+-------+

Also note that click_time is "passed thru" in steps 2-4 so as to be included in the final dataset.

这篇关于带有动态最后的 Spark 高级窗口的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆