Spark Structured Streaming 是否可以进行适当的事件时间会话? [英] Is proper event-time sessionization possible with Spark Structured Streaming?

查看:60
本文介绍了Spark Structured Streaming 是否可以进行适当的事件时间会话?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一直在玩 Spark Structured Streaming 和 mapGroupsWithState(特别是按照 StructuredSessionization 示例在 Spark 源中).鉴于我的用例,我想确认我认为 mapGroupsWithState 存在的一些限制.

Been playing around Spark Structured Streaming and mapGroupsWithState (specifically following the StructuredSessionization example in the Spark source). I want to confirm some limitations I believe exist with mapGroupsWithState given my use case.

就我而言,会话是一组不间断的用户活动,这样两个按时间顺序(按事件时间,而不是处理时间)排序的事件之间的间隔不会超过某个开发人员定义的持续时间(30 分钟是常见的).

A session for my purposes is a group of uninterrupted activity for a user such that no two chronologically ordered (by event time, not processing time) events are separated by more than some developer-defined duration (30 minutes is common).

在跳入代码之前,一个例子会有所帮助:

An example will help before jumping into code:

{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}

对于上面的流,会话定义为 30 分钟不活动.在流式上下文中,我们应该以一个会话结束(第二个尚未完成):

For the stream above, a session is defined with a 30 minute period of inactivity. In a streaming context, we should end up with one session (the second has yet to complete):

[
  {
    "user_id": "mike",
    "startTimestamp": "2018-01-01T00:00:00",
    "endTimestamp": "2018-01-01T00:05:00"
  }
]

现在考虑以下 Spark 驱动程序:

Now consider the following Spark driver program:

import java.sql.Timestamp

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

object StructuredSessionizationV2 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("StructredSessionizationRedux")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    implicit val ctx = spark.sqlContext
    val input = MemoryStream[String]

    val EVENT_SCHEMA = new StructType()
      .add($"event_time".string)
      .add($"user_id".string)

    val events = input.toDS()
      .select(from_json($"value", EVENT_SCHEMA).alias("json"))
      .select($"json.*")
      .withColumn("event_time", to_timestamp($"event_time"))
      .withWatermark("event_time", "1 hours")
    events.printSchema()

    val sessionized = events
      .groupByKey(row => row.getAs[String]("user_id"))
      .mapGroupsWithState[SessionState, SessionOutput](GroupStateTimeout.EventTimeTimeout) {
      case (userId: String, events: Iterator[Row], state: GroupState[SessionState]) =>
        println(s"state update for user ${userId} (current watermark: ${new Timestamp(state.getCurrentWatermarkMs())})")
        if (state.hasTimedOut) {
          println(s"User ${userId} has timed out, sending final output.")
          val finalOutput = SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = true
          )
          // Drop this user's state
          state.remove()
          finalOutput
        } else {
          val timestamps = events.map(_.getAs[Timestamp]("event_time").getTime).toSeq
          println(s"User ${userId} has new events (min: ${new Timestamp(timestamps.min)}, max: ${new Timestamp(timestamps.max)}).")
          val newState = if (state.exists) {
            println(s"User ${userId} has existing state.")
            val oldState = state.get
            SessionState(
              startTimestampMs = math.min(oldState.startTimestampMs, timestamps.min),
              endTimestampMs = math.max(oldState.endTimestampMs, timestamps.max)
            )
          } else {
            println(s"User ${userId} has no existing state.")
            SessionState(
              startTimestampMs = timestamps.min,
              endTimestampMs = timestamps.max
            )
          }
          state.update(newState)
          state.setTimeoutTimestamp(newState.endTimestampMs, "30 minutes")
          println(s"User ${userId} state updated. Timeout now set to ${new Timestamp(newState.endTimestampMs + (30 * 60 * 1000))}")
          SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = false
          )
        }
      }

    val eventsQuery = sessionized
      .writeStream
      .queryName("events")
      .outputMode("update")
      .format("console")
      .start()

    input.addData(
      """{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}"""
    )
    input.addData(
      """{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}"""
    )
    eventsQuery.processAllAvailable()
  }

  case class SessionState(startTimestampMs: Long, endTimestampMs: Long) {
    def durationMs: Long = endTimestampMs - startTimestampMs
  }

  case class SessionOutput(userId: String, startTimestampMs: Long, endTimestampMs: Long, durationMs: Long, expired: Boolean)
}

那个程序的输出是:

root
 |-- event_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)

state update for user mike (current watermark: 1969-12-31 19:00:00.0)
User mike has new events (min: 2018-01-01 00:00:00.0, max: 2018-01-01 00:05:00.0).
User mike has no existing state.
User mike state updated. Timeout now set to 2018-01-01 00:35:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514783100000|    300000|  false|
+------+----------------+--------------+----------+-------+

state update for user mike (current watermark: 2017-12-31 23:05:00.0)
User mike has new events (min: 2018-01-01 00:45:00.0, max: 2018-01-01 00:45:00.0).
User mike has existing state.
User mike state updated. Timeout now set to 2018-01-01 01:15:00.0
-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514785500000|   2700000|  false|
+------+----------------+--------------+----------+-------+

根据我的会话定义,第二批中的单个事件应该触发会话状态到期,从而触发新会话.但是,由于水印(2017-12-31 23:05:00.0)还没有通过状态的超时(2018-01-01 00:35:00.0),状态未过期并且事件被错误地添加到现有会话中,尽管自上一批中的最新时间戳以来已经过去了 30 多分钟.

Given my session definition, the single event in the second batch should trigger an expiry of session state and thus a new session. However, since the watermark (2017-12-31 23:05:00.0) has not passed the state's timeout (2018-01-01 00:35:00.0), state isn't expired and the event is erroneously added to the existing session despite the fact that more than 30 minutes have passed since the latest timestamp in the previous batch.

我认为会话状态过期的唯一方法是,如果在批处理中收到来自不同用户的足够多的事件,以使水印超过 mike 的状态超时.

I think the only way for session state expiration to work as I'm hoping is if enough events from different users were received within the batch to advance the watermark past the state timeout for mike.

我想人们也可能会弄乱流的水印,但我想不出我将如何做到这一点来完成我的用例.

I suppose one could also mess with the stream's watermark, but I can't think of how I'd do that to accomplish my use case.

这准确吗?我是否遗漏了如何在 Spark 中正确执行基于事件时间的会话?

Is this accurate? Am I missing anything in how to properly do event time-based sessionization in Spark?

推荐答案

如果水印间隔大于会话间隙持续时间,您提供的实现似乎不起作用.

The implementation you have provided does not seem to work if the watermark interval is greater than session gap duration.

对于您显示的工作逻辑,您需要将水印间隔设置为 <30 分钟.

For the logic you have shown to work, you need to set the watermark interval to < 30 mins.

如果你真的希望水印间隔独立于(或超过)会话间隙持续时间,则需要等到水印通过(水印+间隙)才能使状态过期.合并逻辑似乎是盲目地合并窗口.这应该在合并之前考虑间隙持续时间.

If you really want the watermark interval to be independent of (or more than) the session gap duration, you need to wait until the watermark passes (watermark + gap) to expire the state. The merging logic seems to blindly merge the windows. This should take the gap duration into account before merging.

这篇关于Spark Structured Streaming 是否可以进行适当的事件时间会话?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆