Spark Streaming 1.6.0 中检查点/WAL 的可靠性问题 [英] Reliability issues with Checkpointing/WAL in Spark Streaming 1.6.0

查看:24
本文介绍了Spark Streaming 1.6.0 中检查点/WAL 的可靠性问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

说明

我们在 Scala 中有一个 Spark Streaming 1.5.2 应用程序,它从 Kinesis Stream 读取 JSON 事件,执行一些转换/聚合并将结果写入不同的 S3 前缀.当前批处理间隔为 60 秒.我们有 3000-7000 个事件/秒.我们正在使用检查点来保护我们免于丢失聚合.

它运行良好有一段时间了,从异常中恢复,甚至集群重新启动.我们最近重新编译了 Spark Streaming 1.6.0 的代码,只更改了 build.sbt 文件中的库依赖项.在 Spark 1.6.0 集群中运行代码几个小时后,我们注意到以下几点:

  1. 在 1.6.0 中,输入速率"和处理时间"的波动性大幅增加(见下面的屏幕截图).
  2. 每隔几个小时,就会在将记录写入 BlockAdditionEvent ... 到 WriteAheadLog 时抛出异常".java.util.concurrent.TimeoutException:Futures 在 [5000 毫秒] 后超时"异常(请参阅下面的完整堆栈跟踪)与特定批次(分钟)的下降到 0 事件/秒一致.

在做了一些挖掘之后,我认为第二个问题看起来与这个

Spark 1.6.0 统计 - 截图

完整的堆栈跟踪

<预> <代码> 16/01/19 3点25分03秒WARN ReceivedBlockTracker:抛出异常,而写记录:BlockAdditionEvent(ReceivedBlockInfo(0,有些(3521),一些(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650),SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874),SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914),SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842),SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618),SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(输入0-1453142312126,一些(3521))))到WriteAheadLog.java.util.concurrent.TimeoutException:期货在 [5000 毫秒] 后超时在 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)在 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)在 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)在 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)在 scala.concurrent.Await$.result(package.scala:107)在 org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)在 org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)在 org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)在 org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)在 org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)在 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)在 java.lang.Thread.run(Thread.java:745)

源代码提取

<代码>...//创建一个新的StreamingContext并设置它的函数def setupContext(): StreamingContext = {...//创建一个StreamingContextval ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))//创建一个 Kinesis DStreamval 数据 = KinesisUtils.createStream(ssc,kinesisAppName、kinesisStreamName、kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),InitialPositionInStream.LATEST,秒(kinesisCheckpointIntervalSeconds),StorageLevel.MEMORY_AND_DISK_SER_2、awsAccessKeyId、awsSecretKey)...ssc.checkpoint(checkpointDir)安全中心}//获取或创建流上下文.val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)ssc.start()ssc.awaitTermination()

解决方案

Following zero323 关于发表我的评论的建议作为答案:

增加 spark.streaming.driver.writeAheadLog.batchingTimeout 解决了检查点超时问题.我们在确保有足够的空间后才这样做.我们已经对其进行了一段时间的测试.所以我只建议在仔细考虑后增加它.

详情

我们在 $SPARK_HOME/conf/spark-defaults.conf 中使用了这 2 个设置:

spark.streaming.driver.writeAheadLog.allowBatching truespark.streaming.driver.writeAheadLog.batchingTimeout 15000

最初,我们只将 spark.streaming.driver.writeAheadLog.allowBatching 设置为 true.

更改之前,我们在测试环境中重现了问题中提到的问题(...ReceivedBlockTracker:写入记录时抛出异常...").它每隔几个小时发生一次.更改后,问题消失.在进入生产环境之前,我们运行了几天.

我们发现 WriteAheadLogUtils 类的getBatchingTimeout() 方法的默认值为 5000 毫秒,如下所示:

def getBatchingTimeout(conf: SparkConf): Long = {conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)}

Description

We have a Spark Streaming 1.5.2 application in Scala that reads JSON events from a Kinesis Stream, does some transformations/aggregations and writes the results to different S3 prefixes. The current batch interval is 60 seconds. We have 3000-7000 events/sec. We’re using checkpointing to protect us from losing aggregations.

It’s been working well for a while, recovering from exceptions and even cluster restarts. We recently recompiled the code for Spark Streaming 1.6.0, only changing the library dependencies in the build.sbt file. After running the code in a Spark 1.6.0 cluster for several hours, we’ve noticed the following:

  1. "Input Rate" and "Processing Time" volatility has increased substantially (see the screenshots below) in 1.6.0.
  2. Every few hours, there’s an ‘’Exception thrown while writing record: BlockAdditionEvent … to the WriteAheadLog. java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]" exception (see complete stack trace below) coinciding with the drop to 0 events/sec for specific batches (minutes).

After doing some digging, I think the second issue looks related to this Pull Request. The initial goal of the PR: "When using S3 as a directory for WALs, the writes take too long. The driver gets very easily bottlenecked when multiple receivers send AddBlock events to the ReceiverTracker. This PR adds batching of events in the ReceivedBlockTracker so that receivers don’t get blocked by the driver for too long."

We are checkpointing in S3 in Spark 1.5.2 and there are no performance/reliability issues. We’ve tested checkpointing in Spark 1.6.0 in S3 and local NAS and in both cases we’re receiving this exception. It looks like when it takes more than 5 seconds to checkpoint a batch, this exception arises and we’ve checked that the events for that batch are lost forever.

Questions

  • Is the increase in "Input Rate" and "Processing Time" volatility expected in Spark Streaming 1.6.0 and is there any known way of improving it?

  • Do you know of any workaround apart from these 2?:

    1) To guarantee that it takes less than 5 seconds for the checkpointing sink to write all files. In my experience, you cannot guarantee that with S3, even for small batches. For local NAS, it depends on who’s in charge of infrastructure (difficult with cloud providers).

    2) Increase the spark.streaming.driver.writeAheadLog.batchingTimeout property value.

  • Would you expect to lose any events in the described scenario? I'd think that if batch checkpointing fails, the shard/receiver Sequence Numbers wouldn't be increased and it would be retried at a later time.

Spark 1.5.2 Statistics - Screenshot

Spark 1.6.0 Statistics - Screenshot

Full Stack Trace

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Source Code Extract

...
     // Function to create a new StreamingContext and set it up
  def setupContext(): StreamingContext = {
    ...
    // Create a StreamingContext
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds))

    // Create a Kinesis DStream
    val data = KinesisUtils.createStream(ssc,
      kinesisAppName, kinesisStreamName,
      kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(),
      InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds),
      StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey)
...
    ssc.checkpoint(checkpointDir)

    ssc
  }


  // Get or create a streaming context.
  val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext)

  ssc.start()
  ssc.awaitTermination()

解决方案

Following zero323's suggestion about posting my comment as an answer:

Increasing spark.streaming.driver.writeAheadLog.batchingTimeout solved the checkpointing timeout issue. We did it after making sure we had room for it. We have been testing it for a while now. So I only recommend increasing it after careful consideration.

DETAILS

We used these 2 settings in $SPARK_HOME/conf/spark-defaults.conf:

spark.streaming.driver.writeAheadLog.allowBatching true spark.streaming.driver.writeAheadLog.batchingTimeout 15000

Originally, we only had spark.streaming.driver.writeAheadLog.allowBatching set to true.

Before the change, we had reproduced the issue mentioned in the question ("...ReceivedBlockTracker: Exception thrown while writing record...") in a testing environment. It occurred every few hours. After the change, the issue disappeared. We ran it for several days before moving to production.

We had found that the getBatchingTimeout() method of the WriteAheadLogUtils class had a default value of 5000ms, as seen here:

def getBatchingTimeout(conf: SparkConf): Long = {
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000)
}

这篇关于Spark Streaming 1.6.0 中检查点/WAL 的可靠性问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆