Spark Streaming MapWithState超时延迟? [英] Spark streaming mapWithState timeout delayed?

查看:69
本文介绍了Spark Streaming MapWithState超时延迟?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望Spark 1.6+的新mapWithState API可以立即删除超时的对象,但是会有延迟.

I expected the new mapWithState API for Spark 1.6+ to near-immediately remove objects that are timed-out, but there is a delay.

我正在使用

与nc( nc -l -p< port> )一起

当我在nc窗口中键入一个单词时,我看到每秒在控制台中打印出一个元组.但这似乎并没有像根据超时设置所预期的那样,在5秒后打印出超时消息.元组过期所需的时间似乎在5到9之间.20多岁

When I type a word into the nc window I see the tuple being printed in the console every second. But it doesn't seem like the timing out message gets printed out 5s later, as expected based on the timeout set. The time it takes for the tuple to expire seems to vary between 5 & 20s.

我是否缺少某些配置选项,还是可能仅在与检查点同时执行超时?

Am I missing some configuration option, or is the timeout perhaps only performed at the same time as checkpoints?

推荐答案

一旦事件超时,则立即将其 NOT 删除,但只能通过将其保存到"deltaMap"中来标记为删除:

Once an event times out it's NOT deleted right away, but is only marked for deletion by saving it to a 'deltaMap':

override def remove(key: K): Unit = {
  val stateInfo = deltaMap(key)
  if (stateInfo != null) {
    stateInfo.markDeleted()
  } else {
    val newInfo = new StateInfo[S](deleted = true)
    deltaMap.update(key, newInfo)
  }
}

然后,仅在检查点收集超时事件并将其发送到输出流.也就是说:在批处理t超时的事件只会在下一个检查点才会出现在输出流中-默认情况下,平均在5个批处理间隔之后,即批处理t + 5:

Then, timed out events are collected and sent to the output stream only at checkpoint. That is: events which time out at batch t, will appear in the output stream only at the next checkpoint - by default, after 5 batch-intervals on average, i.e. batch t+5:

 override def checkpoint(): Unit = {
    super.checkpoint()
    doFullScan = true
  }

...

removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled

...

// Get the timed out state records, call the mapping function on each and collect the
    // data returned
    if (removeTimedoutData && timeoutThresholdTime.isDefined) {
...

仅当元素足够多且序列化了状态图时,实际上才将它们删除-当前也仅在检查点处发生这种情况:

Elements are actually removed only when there are enough of them, and when state map is being serialized - which currently also happens only at checkpoint:

  /** Whether the delta chain length is long enough that it should be compacted */
  def shouldCompact: Boolean = {
    deltaChainLength >= deltaChainThreshold
  }
  // Write the data in the parent state map while copying the data into a new parent map for
    // compaction (if needed)
    val doCompaction = shouldCompact
...

默认情况下,检查点每10次迭代发生一次,因此在上面的示例中每10秒钟发生一次;由于您的超时时间是5秒,因此预计会在5到15秒内发生事件.

By default checkpointing occurs every 10 iterations, thus in the example above every 10 seconds; since your timeout is 5 seconds, events are expected within 5-15 seconds.

在@YuvalItzchakov发表的评论之后,更正和详尽的答案

Corrected and elaborated answer following comments by @YuvalItzchakov

这篇关于Spark Streaming MapWithState超时延迟?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆