Spark结构化流是否可以进行正确的事件时间会话化? [英] Is proper event-time sessionization possible with Spark Structured Streaming?

查看:118
本文介绍了Spark结构化流是否可以进行正确的事件时间会话化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

喜欢玩Spark结构化流媒体和mapGroupsWithState(特别是遵循

Been playing around Spark Structured Streaming and mapGroupsWithState (specifically following the StructuredSessionization example in the Spark source). I want to confirm some limitations I believe exist with mapGroupsWithState given my use case.

对我而言,会话是指用户的一组不间断活动,因此,两个按时间顺序排列(按事件时间而不是处理时间)的事件之间的间隔不会超过某个开发人员定义的持续时间(通常为30分钟).

A session for my purposes is a group of uninterrupted activity for a user such that no two chronologically ordered (by event time, not processing time) events are separated by more than some developer-defined duration (30 minutes is common).

在进入代码之前,一个示例会有所帮助:

An example will help before jumping into code:

{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}
{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}

对于上述流,会话定义为30分钟的不活动时间.在流媒体环境中,我们应该以一个会话结束(第二个会话尚未完成):

For the stream above, a session is defined with a 30 minute period of inactivity. In a streaming context, we should end up with one session (the second has yet to complete):

[
  {
    "user_id": "mike",
    "startTimestamp": "2018-01-01T00:00:00",
    "endTimestamp": "2018-01-01T00:05:00"
  }
]

现在考虑以下Spark驱动程序:

Now consider the following Spark driver program:

import java.sql.Timestamp

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout}

object StructuredSessionizationV2 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .master("local[2]")
      .appName("StructredSessionizationRedux")
      .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    import spark.implicits._

    implicit val ctx = spark.sqlContext
    val input = MemoryStream[String]

    val EVENT_SCHEMA = new StructType()
      .add($"event_time".string)
      .add($"user_id".string)

    val events = input.toDS()
      .select(from_json($"value", EVENT_SCHEMA).alias("json"))
      .select($"json.*")
      .withColumn("event_time", to_timestamp($"event_time"))
      .withWatermark("event_time", "1 hours")
    events.printSchema()

    val sessionized = events
      .groupByKey(row => row.getAs[String]("user_id"))
      .mapGroupsWithState[SessionState, SessionOutput](GroupStateTimeout.EventTimeTimeout) {
      case (userId: String, events: Iterator[Row], state: GroupState[SessionState]) =>
        println(s"state update for user ${userId} (current watermark: ${new Timestamp(state.getCurrentWatermarkMs())})")
        if (state.hasTimedOut) {
          println(s"User ${userId} has timed out, sending final output.")
          val finalOutput = SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = true
          )
          // Drop this user's state
          state.remove()
          finalOutput
        } else {
          val timestamps = events.map(_.getAs[Timestamp]("event_time").getTime).toSeq
          println(s"User ${userId} has new events (min: ${new Timestamp(timestamps.min)}, max: ${new Timestamp(timestamps.max)}).")
          val newState = if (state.exists) {
            println(s"User ${userId} has existing state.")
            val oldState = state.get
            SessionState(
              startTimestampMs = math.min(oldState.startTimestampMs, timestamps.min),
              endTimestampMs = math.max(oldState.endTimestampMs, timestamps.max)
            )
          } else {
            println(s"User ${userId} has no existing state.")
            SessionState(
              startTimestampMs = timestamps.min,
              endTimestampMs = timestamps.max
            )
          }
          state.update(newState)
          state.setTimeoutTimestamp(newState.endTimestampMs, "30 minutes")
          println(s"User ${userId} state updated. Timeout now set to ${new Timestamp(newState.endTimestampMs + (30 * 60 * 1000))}")
          SessionOutput(
            userId = userId,
            startTimestampMs = state.get.startTimestampMs,
            endTimestampMs = state.get.endTimestampMs,
            durationMs = state.get.durationMs,
            expired = false
          )
        }
      }

    val eventsQuery = sessionized
      .writeStream
      .queryName("events")
      .outputMode("update")
      .format("console")
      .start()

    input.addData(
      """{"event_time": "2018-01-01T00:00:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:01:00", "user_id": "mike"}""",
      """{"event_time": "2018-01-01T00:05:00", "user_id": "mike"}"""
    )
    input.addData(
      """{"event_time": "2018-01-01T00:45:00", "user_id": "mike"}"""
    )
    eventsQuery.processAllAvailable()
  }

  case class SessionState(startTimestampMs: Long, endTimestampMs: Long) {
    def durationMs: Long = endTimestampMs - startTimestampMs
  }

  case class SessionOutput(userId: String, startTimestampMs: Long, endTimestampMs: Long, durationMs: Long, expired: Boolean)
}

该程序的输出为:

root
 |-- event_time: timestamp (nullable = true)
 |-- user_id: string (nullable = true)

state update for user mike (current watermark: 1969-12-31 19:00:00.0)
User mike has new events (min: 2018-01-01 00:00:00.0, max: 2018-01-01 00:05:00.0).
User mike has no existing state.
User mike state updated. Timeout now set to 2018-01-01 00:35:00.0
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514783100000|    300000|  false|
+------+----------------+--------------+----------+-------+

state update for user mike (current watermark: 2017-12-31 23:05:00.0)
User mike has new events (min: 2018-01-01 00:45:00.0, max: 2018-01-01 00:45:00.0).
User mike has existing state.
User mike state updated. Timeout now set to 2018-01-01 01:15:00.0
-------------------------------------------
Batch: 1
-------------------------------------------
+------+----------------+--------------+----------+-------+
|userId|startTimestampMs|endTimestampMs|durationMs|expired|
+------+----------------+--------------+----------+-------+
|  mike|   1514782800000| 1514785500000|   2700000|  false|
+------+----------------+--------------+----------+-------+

鉴于我的会话定义,第二批事件中的单个事件应该触发会话状态到期,从而触发新的会话.但是,由于水印(2017-12-31 23:05:00.0)尚未通过状态超时(2018-01-01 00:35:00.0),因此状态未过期,并且该事件已被错误地添加到现有会话中,尽管自最近一次以来已经过去了30分钟以上上一批中的时间戳.

Given my session definition, the single event in the second batch should trigger an expiry of session state and thus a new session. However, since the watermark (2017-12-31 23:05:00.0) has not passed the state's timeout (2018-01-01 00:35:00.0), state isn't expired and the event is erroneously added to the existing session despite the fact that more than 30 minutes have passed since the latest timestamp in the previous batch.

我认为,使会话状态失效的唯一方法是希望在批处理中是否收到了来自不同用户的足够多的事件,以使水印超过状态超时以达到mike的状态.

I think the only way for session state expiration to work as I'm hoping is if enough events from different users were received within the batch to advance the watermark past the state timeout for mike.

我想也可以弄乱流的水印,但我想不出要如何完成用例.

I suppose one could also mess with the stream's watermark, but I can't think of how I'd do that to accomplish my use case.

这是准确的吗?在Spark中如何正确进行基于事件时间的会话化过程时,我是否缺少任何内容?

Is this accurate? Am I missing anything in how to properly do event time-based sessionization in Spark?

推荐答案

如果水印间隔大于会话间隔持续时间,则您提供的实现似乎不起作用.

The implementation you have provided does not seem to work if the watermark interval is greater than session gap duration.

对于已展示的逻辑,您需要将水印间隔设置为< 30分钟.

For the logic you have shown to work, you need to set the watermark interval to < 30 mins.

如果您确实希望水印间隔独立于(或大于)会话间隔持续时间,则需要等到水印通过(水印+间隔)以使状态到期.合并逻辑似乎盲目地合并了窗口.在合并之前,应该考虑间隙持续时间.

If you really want the watermark interval to be independent of (or more than) the session gap duration, you need to wait until the watermark passes (watermark + gap) to expire the state. The merging logic seems to blindly merge the windows. This should take the gap duration into account before merging.

这篇关于Spark结构化流是否可以进行正确的事件时间会话化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆