为什么Spark Worker的内存使用量会随着时间增加? [英] Why does memory usage of spark worker increases with time?

查看:146
本文介绍了为什么Spark Worker的内存使用量会随着时间增加?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个正在运行的Spark Streaming应用程序,它使用 mapWithState 函数来跟踪RDD的状态.该应用程序可以正常运行几分钟,但随后会崩溃

I have a Spark Streaming application running which uses mapWithState function to track state of RDD. The application runs fine for few minutes but then crashes with

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 373

我观察到,即使我为mapWithStateRDD设置了超时,Spark应用程序的内存使用量也会随着时间线性增加.请查看下面的代码片段和内存使用情况-

I observed that Memory usage of Spark application increases over time linearly even though i have set the timeout for mapWithStateRDD. Please see the code snippet below and memory usage -

val completedSess = sessionLines
                    .mapWithState(StateSpec.function(trackStateFunction _)
                    .numPartitions(80)
                    .timeout(Minutes(5)))

如果每个RDD都有明确的超时,为什么内存应该随着时间线性增加?

Why should the memory increase linearly over time if there is an explicit timeout for each RDD ?

我尝试增加内存,但这并不重要.我想念什么?

I have tried increasing the memory but it does not matter. What am i missing ?

编辑-参考代码

def trackStateFunction(batchTime:时间,键:字符串,值:Option [String],状态:State [(Boolean,List [String],Long)])):Option [(Boolean,List [String])] ={

def trackStateFunction(batchTime: Time, key: String, value: Option[String], state: State[(Boolean, List[String], Long)]): Option[(Boolean, List[String])] ={

  def updateSessions(newLine: String): Option[(Boolean, List[String])] = {
    val currentTime = System.currentTimeMillis() / 1000

    if (state.exists()) {
      val newLines = state.get()._2 :+ newLine

      //check if end of Session reached.
      // if yes, remove the state and return. Else update the state
      if (isEndOfSessionReached(value.getOrElse(""), state.get()._4)) {
        state.remove()
        Some(true, newLines)
      }
      else {
        val newState = (false, newLines, currentTime)
        state.update(newState)
        Some(state.get()._1, state.get()._2)
      }
    }
    else  {
      val newState = (false, List(value.get), currentTime)
      state.update(newState)
      Some(state.get()._1, state.get()._2)
    }
  }

  value match {
    case Some(newLine) => updateSessions(newLine)
    case _ if state.isTimingOut() => Some(true, state.get()._2)
    case _ => {
      println("Not matched to any expression")
      None
    }
  }
}

推荐答案

根据mapwithstate的信息:州规格初始状态为RDD-您可以从某些商店加载初始状态,然后使用该状态启动流式作业.

According to the information of mapwithstate: State Specification An initial state as RDD - You can load the initial state from some store and then start your streaming job with that state.

分区数-键值状态dstream按键分区.如果您之前对状态的大小有一个很好的估计,则可以提供分区数以对其进行相应的分区.

Number of partitions - The key value state dstream is partitioned by keys. If you have a good estimate of the size of the state before, you can provide the number of partitions to partition it accordingly.

分区程序-您还可以提供自定义分区程序.默认分区程序是哈希分区程序.如果您对键空间有很好的了解,则可以提供一个自定义分区程序,该分区程序可以比默认的哈希分区程序进行更有效的更新.

Partitioner - You can also provide a custom partitioner. The default partitioner is hash partitioner. If you have a good understanding of the key space, then you can provide a custom partitioner that can do efficient updates than the default hash partitioner.

超时-这将确保在特定时间内未更新其值的键将从状态中删除.这可以帮助清理旧键的状态.

Timeout - This will ensure that keys whose values are not updated for a specific period of time will be removed from the state. This can help in cleaning up the state with old keys.

因此超时仅与一段时间后不更新的键有关.内存将运行满并最终阻塞,因为执行程序没有分配足够的内存.这给出了MetaDataFetchFailed异常.随着记忆的增加,我希望您是指执行者.即使那样,为执行者增加内存可能也不起作用,因为流仍在继续.使用MapWithState,会话线将包含与输入dstream相同的记录数.因此解决此问题的方法是使您的dstream较小.在流式传输上下文中,您可以设置一个批处理间隔,这很可能会解决此问题

So the timeout has only to do with cleaning after a while with the keys that are not updating. The memory will run full and eventually block, because the executors do not have enough memory assigned. This gives the MetaDataFetchFailed exception. With Increasing the memory, I hope you mean the executors. Even then increasing the memory for the executors probably doesn't work since the stream still continues. With MapWithState the sessionlines will contain the same # of records as the input dstream. So to solve this is to make your dstream smaller. In the streaming context you can set a batch interval which will most likely solve this

val ssc =新的StreamingContext(sc,Seconds(batchIntervalSeconds))

val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

请记住也要偶尔创建快照和检查点.通过快照,您可以将以前丢失的流中的信息用于其他计算.希望这有助于获得更多信息,请参见: https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html

Remember to also make once in a while a snapshot and a checkpoint. The snapshots will allow you to use the information from the now earlier lost stream for other calculations. Hopefully this helped for more information see: https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html , and http://asyncified.io/2016/07/31/exploring-stateful-streaming-with-apache-spark/

这篇关于为什么Spark Worker的内存使用量会随着时间增加?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆