Spark Streaming 1.6.0中检查点/WAL的可靠性问题 [英] Reliability issues with Checkpointing/WAL in Spark Streaming 1.6.0

查看:172
本文介绍了Spark Streaming 1.6.0中检查点/WAL的可靠性问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们在Scala中有一个Spark Streaming 1.5.2应用程序,该应用程序从Kinesis Stream读取JSON事件,进行一些转换/聚合并将结果写入不同的S3前缀.当前的批处理间隔为60秒.我们有3000-7000个事件/秒.我们正在使用检查点来防止丢失聚合.

We have a Spark Streaming 1.5.2 application in Scala that reads JSON events from a Kinesis Stream, does some transformations/aggregations and writes the results to different S3 prefixes. The current batch interval is 60 seconds. We have 3000-7000 events/sec. We’re using checkpointing to protect us from losing aggregations.

它运行良好已有一段时间,可以从异常中恢复,甚至可以重新启动群集.我们最近重新编译了Spark Streaming 1.6.0的代码,只更改了 build.sbt 文件中的库依赖项.在Spark 1.6.0集群中运行代码数小时后,我们注意到以下几点:

It’s been working well for a while, recovering from exceptions and even cluster restarts. We recently recompiled the code for Spark Streaming 1.6.0, only changing the library dependencies in the build.sbt file. After running the code in a Spark 1.6.0 cluster for several hours, we’ve noticed the following:

  1. 在1.6.0中,输入速率"和处理时间"的波动性已大大增加(请参见下面的屏幕截图).
  2. 每隔几个小时,在将记录:BlockAdditionEvent…写入WriteAheadLog时会抛出’Exception". java.util.concurrent.TimeoutException:在[5000毫秒]异常(请参阅下面的完整堆栈跟踪)之后,期货超时,这与特定批次(分钟)的事件下降到0事件/秒相符.

经过一些挖掘,我认为第二个问题与此拉动请求有关. PR的最初目标是:将S3用作WAL的目录时,写入时间太长.当多个接收者将AddBlock事件发送到ReceiverTracker时,驱动程序很容易成为瓶颈.此PR在ReceivedBlockTracker中添加了事件批处理,以使接收器不会被驱动程序阻塞太长时间."

After doing some digging, I think the second issue looks related to this Pull Request. The initial goal of the PR: "When using S3 as a directory for WALs, the writes take too long. The driver gets very easily bottlenecked when multiple receivers send AddBlock events to the ReceiverTracker. This PR adds batching of events in the ReceivedBlockTracker so that receivers don’t get blocked by the driver for too long."

我们正在Spark 1.5.2的S3中检查点,并且没有性能/可靠性问题.我们已经在S3和本地NAS的Spark 1.6.0中测试了检查点,并且在两种情况下都收到此异常.似乎需要5秒钟以上的时间来检查一个批处理,就会出现此异常,并且我们已经检查了该批处理的事件是否永远丢失了.

We are checkpointing in S3 in Spark 1.5.2 and there are no performance/reliability issues. We’ve tested checkpointing in Spark 1.6.0 in S3 and local NAS and in both cases we’re receiving this exception. It looks like when it takes more than 5 seconds to checkpoint a batch, this exception arises and we’ve checked that the events for that batch are lost forever.

  • Spark Streaming 1.6.0中输入速率"和处理时间"波动性的预期增加了吗,有没有已知的改善方法?

  • Is the increase in "Input Rate" and "Processing Time" volatility expected in Spark Streaming 1.6.0 and is there any known way of improving it?

除了这两个以外,您还知道其他解决方法吗?:

Do you know of any workaround apart from these 2?:

1)确保检查点接收器不到5秒即可写入所有文件.以我的经验,即使对于小批量生产,您也不能保证使用S3.对于本地NAS,这取决于谁负责基础架构(与云提供商不同).

1) To guarantee that it takes less than 5 seconds for the checkpointing sink to write all files. In my experience, you cannot guarantee that with S3, even for small batches. For local NAS, it depends on who’s in charge of infrastructure (difficult with cloud providers).

2)增加spark.streaming.driver.writeAheadLog.batchingTimeout属性值.

2) Increase the spark.streaming.driver.writeAheadLog.batchingTimeout property value.

在所描述的情况下,您是否希望丢失任何事件?我认为,如果批处理检查点失败,则不会增加分片/接收器序列号,以后会重试.

Would you expect to lose any events in the described scenario? I'd think that if batch checkpointing fails, the shard/receiver Sequence Numbers wouldn't be increased and it would be retried at a later time.

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

源代码提取

...
     // Function to create a new StreamingContext and set it up
  def setupContext(): StreamingContext = {
    ...
    // Create a StreamingContext
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

    // Create a Kinesis DStream
    val data = KinesisUtils.createStream(ssc,
      kinesisAppName, kinesisStreamName,
      kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),
      InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
      StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey)
...
    ssc.checkpoint(checkpointDir)

    ssc
  }


  // Get or create a streaming context.
  val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)

  ssc.start()
  ssc.awaitTermination()

推荐答案

以下 zero323 关于发布我的评论的建议作为答案:

Following zero323's suggestion about posting my comment as an answer:

增加spark.streaming.driver.writeAheadLog.batchingTimeout解决了检查点超时问题.我们在确保有足够的空间后就这样做了.我们已经测试了一段时间.因此,我只建议在仔细考虑后再增加它.

Increasing spark.streaming.driver.writeAheadLog.batchingTimeout solved the checkpointing timeout issue. We did it after making sure we had room for it. We have been testing it for a while now. So I only recommend increasing it after careful consideration.

详细信息

我们在$ SPARK_HOME/conf/spark-defaults.conf中使用了这两个设置:

We used these 2 settings in $SPARK_HOME/conf/spark-defaults.conf:

spark.streaming.driver.writeAheadLog.allowBatching true spark.streaming.driver.writeAheadLog.batchingTimeout 15000

spark.streaming.driver.writeAheadLog.allowBatching true spark.streaming.driver.writeAheadLog.batchingTimeout 15000

最初,我们仅将spark.streaming.driver.writeAheadLog.allowBatching设置为true.

Originally, we only had spark.streaming.driver.writeAheadLog.allowBatching set to true.

在进行更改之前,我们已经在测试环境中重现了问题中提到的问题("... ReceivedBlockTracker:写入记录时引发了异常...").它每隔几个小时发生一次.更改后,问题消失了.我们将其运行了几天后才投入生产.

Before the change, we had reproduced the issue mentioned in the question ("...ReceivedBlockTracker: Exception thrown while writing record...") in a testing environment. It occurred every few hours. After the change, the issue disappeared. We ran it for several days before moving to production.

我们发现

We had found that the getBatchingTimeout() method of the WriteAheadLogUtils class had a default value of 5000ms, as seen here:

def getBatchingTimeout(conf: SparkConf): Long = {
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
}

这篇关于Spark Streaming 1.6.0中检查点/WAL的可靠性问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆