星火流mapWithState似乎定期重建完整的状态 [英] Spark Streaming mapWithState seems to rebuild complete state periodically

查看:1667
本文介绍了星火流mapWithState似乎定期重建完整的状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我工作的一个斯卡拉(2.11)/火花(1.6.1)流的项目并使用 mapWithState()来跟踪数据可见,从previous批次。

的状态分布在20个分区的多个节点上,用创建S​​tateSpec.function(trackStateFunc _)。numPartitions(20)。在这种状态下,我们有映射到设置高达160.000〜项,其中增长在整个应用程序只有几个键(〜100)。整个状态是高达 3GB ,这可以通过在集群中的每个节点进行处理。在每批,某些数据被添加到一个状态,但不删除,直到该过程的最后,即,〜15分钟。

虽然以下的应用程序用户界面,每10个批次的处理时间非常高相比于其它批次。见图片:

黄色的领域重新present高的处理时间。

一个更详细的招聘观点表明,在这些批次发生在某一个点,什么时候所有20个分区跳过。或者这是UI说的。

我的的理解跳过是每个分区的状态是未执行的一种可能的任务,因为它并不需要重新计算。不过,我不明白为什么量跳过中的每个工作变化,为什么上次工作需要这么多的处理。出现较高的处理时间,无论国家大小,它只是影响的持续时间。

这是在 mapWithState()功能的bug或者是这种预期的行为吗?是否底层的数据结构需要某种洗牌,莫非设置的状态需要复制的数据?或者是更可能是在我的应用程序中的漏洞?


解决方案

  

这是在mapWithState()功能的bug或者是这个意
  行为?


这是预期的行为。你看到的尖峰是因为的您的数据得到检查点的那个特定批次的结束。如果你会发现在较长的批次时间,你会看到它发生持续每隔100秒。这是因为检查点时间是恒定的,并且每个计算你的 batchDuration ,这是你如何经常谈论到数据源读取一批乘以某个常数,除非你明确地设置 DStream.checkpoint 间隔。

下面是有关这块code的 MapWithStateDStream

 覆盖高清初始化(时间:时间):单位= {
  如果(checkpointDuration == NULL){
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
  }
  super.initialize(时间)
}

其中, DEFAULT_CHECKPOINT_DURATION_MULTIPLIER 是:

 私人[流]对象InternalMapWithStateDStream {
  私人VAL DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

这排队正好与你看到的行为,因为你读的批处理时间为每10秒=> 10 * 10 = 100秒。

这是正常的,那就是与星火持续状态的成本。就在你身边的优化可能会觉得你怎么可以最大限度地减少你必须保持在内存中,以国家的大小,这个序列是尽可能快。此外,我希望你开启 KRYO系列化而不是默认的Java序列化,可以给你一个有意义的性能提升。

I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState() to keep track of seen data from previous batches.

The state is distributed in 20 partitions on multiple nodes, created with StateSpec.function(trackStateFunc _).numPartitions(20). In this state we have only a few keys (~100) mapped to Sets with up ~160.000 entries, which grow throughout the application. The entire state is up to 3GB, which can be handled by each node in the cluster. In each batch, some data is added to a state but not deleted until the very end of the process, i.e. ~15 minutes.

While following the application UI, every 10th batch's processing time is very high compared to the other batches. See images:

The yellow fields represent the high processing time.

A more detailed Job view shows that in these batches occur at a certain point, exactly when all 20 partitions are "skipped". Or this is what the UI says.

My understanding of skipped is that each state partition is one possible task which isn't executed, as it doesn't need to be recomputed. However, I don't understand why the amount of skips varies in each Job and why the last Job requires so much processing. The higher processing time occurs regardless of the state's size, it just impacts the duration.

Is this a bug in the mapWithState() functionality or is this intended behaviour? Does the underlying data structure require some kind of reshuffling, does the Set in the state need to copy data? Or is it more likely to be a flaw in my application?

解决方案

Is this a bug in the mapWithState() functionality or is this intended behaviour?

This is intended behavior. The spikes you're seeing is because your data is getting checkpointed at the end of that given batch. If you'll notice the time on the longer batches, you'll see that it happens persistently every 100 seconds. That's because the checkpoint time is constant, and is calculated per your batchDuration, which is how often you talk to your data source to read a batch multiplied by some constant, unless you explicitly set the DStream.checkpoint interval.

Here is the relevant piece of code from MapWithStateDStream:

override def initialize(time: Time): Unit = {
  if (checkpointDuration == null) {
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER
  }
  super.initialize(time)
}

Where DEFAULT_CHECKPOINT_DURATION_MULTIPLIER is:

private[streaming] object InternalMapWithStateDStream {
  private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10
}

Which lines up exactly with the behavior you're seeing, since your read batch duration is every 10 seconds => 10 * 10 = 100 seconds.

This is normal, and that is the cost of persisting state with Spark. An optimization on your side could be to think how you can minimize the size of the state you have to keep in memory, in order for this serialization to be as quick as possible. Also, I hope you've turned on Kryo Serialization instead of the default Java serialization, that can give you a meaningful performance boost.

这篇关于星火流mapWithState似乎定期重建完整的状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆