如何使用mapWithState提取超时会话 [英] How to extract timed-out sessions using mapWithState

查看:23
本文介绍了如何使用mapWithState提取超时会话的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在更新代码,以便从updateStateByKey切换到mapWithState,以便基于2分钟的超时获得用户会话(2分钟仅用于测试目的)。每个会话应在超时之前聚合会话内的所有流数据(JSON字符串)。

这是我的旧代码:

val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecord => {
  val parsed = Utils.parseJSON(eventRecord)
  val member_id = parsed.getOrElse("member_id", "")
  val timestamp = parsed.getOrElse("timestamp", "").toLong
  //The timestamp is returned twice because the first one will be used as the start time and the second one as the end time
  (member_id, (timestamp, timestamp, List(eventRecord)))
})

val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => {
  //transform to (member_id, (time, time, counter, events within session))
  (a._1, (a._2._1, a._2._2, 1, a._2._3))
}).
  reduceByKey((a, b) => {
    //transform to (member_id, (lowestStartTime, MaxFinishTime, sumOfCounter, events within session))
    (Math.min(a._1, b._1), Math.max(a._2, b._2), a._3 + b._3, a._4 ++ b._4)
  }).updateStateByKey(Utils.updateState)
updateStateByKey的问题得到了很好的解释here。我决定使用mapWithState的主要原因之一是,updateStateByKey无法返回已完成的会话(已超时的会话)进行进一步处理。

这是我第一次尝试将旧代码转换为新版本:

val spec = StateSpec.function(updateState _).timeout(Minutes(1))
val latestSessionInfo = membersSessions.map[(String, (Long, Long, Long, List[String]))](a => {
  //transform to (member_id, (time, time, counter, events within session))
  (a._1, (a._2._1, a._2._2, 1, a._2._3))
})
val userSessionSnapshots = latestSessionInfo.mapWithState(spec).snapshotStream()

我稍微误解了updateState应该是什么内容,因为据我所知,超时不应该手动计算(以前是在我的函数Utils.updateState中计算的),.snapshotStream应该返回超时会话。

推荐答案

假设您始终等待2分钟的超时,则可以使mapWithState流仅在触发超时后输出数据。

这对您的代码意味着什么?这意味着您现在需要监视超时,而不是在每次迭代中输出元组。我想您的mapWithState看起来应该是这样的:

def updateState(key: String,
                value: Option[(Long, Long, Long, List[String])],
                state: State[(Long, Long, Long, List[String])]): Option[(Long, Long, Long, List[String])] = {
  def reduce(first: (Long, Long, Long, List[String]), second: (Long, Long, Long, List[String])) = {
    (Math.min(first._1, second._1), Math.max(first._2, second._2), first._3 + second._3, first._4 ++ second._4)
  }

  value match {
    case Some(currentValue) =>
      val result = state
        .getOption()
        .map(currentState => reduce(currentState, currentValue))
        .getOrElse(currentValue)
      state.update(result)
      None
    case _ if state.isTimingOut() => state.getOption()
  }
}

这样,如果状态已超时,则仅将某些内容从外部输出到流,否则在状态内部将其聚合。

这意味着您的电光数据流图表可以过滤取出所有未定义的值,并且只保留以下值:

latestSessionInfo
 .mapWithState(spec)
 .filter(_.isDefined)

filter之后,您将只有超时的状态。

这篇关于如何使用mapWithState提取超时会话的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆