长时间运行后,Spark有状态流作业在指向S3的检查点处挂起 [英] Spark stateful streaming job hangs at checkpointing to S3 after long uptime

查看:71
本文介绍了长时间运行后,Spark有状态流作业在指向S3的检查点处挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近一直在对我们的Spark Streaming应用进行压力测试.压力测试以每秒大约20,000条消息的速度接收消息,消息大小在200字节之间-在Kafka中为1K,Spark Streaming每4秒读取一批.

I've been recently stress testing our Spark Streaming app. The stress testing ingests about 20,000 messages/sec with message sizes varying between 200bytes - 1K into Kafka, where Spark Streaming is reading batches every 4 seconds.

我们的Spark集群在具有独立集群管理器的1.6.1版本上运行,并且我们将Scala 2.10.6用于我们的代码.

Our Spark cluster runs on version 1.6.1 with Standalone cluster manager, and we're using Scala 2.10.6 for our code.

运行大约15-20小时后,正在启动检查点(以40秒为间隔完成)的执行程序之一被以下堆栈跟踪所卡住,并且永远不会完成:

After about a 15-20 hour run, one of the executors which is initiating a checkpoint (done at a 40 second interval) is stuck with the following stack trace and never completes:

java.net.SocketInputStream.socketRead0(本机方法) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177) org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.invoke(未知源) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native.$ Proxy18.retrieveMetadata(未知 来源) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:168) org.apache.spark.rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.rdd.ReliableCheckpointRDD $$ anonfun $ writeRDDToCheckpointDirectory $ 1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run(Task.scala:89) org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177) org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native.$Proxy18.retrieveMetadata(Unknown Source) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:168) org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory$1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory$1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run(Task.scala:89) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)

在卡住时,Spark驱动程序拒绝继续处理传入的批处理,并创建了一个排队的批处理的巨大积压,直到释放卡住"的任务后才可以处理.

While being stuck, the spark driver refuses to continue processing incoming batches, and creates a huge backlog of queued batches which can't be processed until releasing the task that is "stuck".

此外,查看streaming-job-executor-0下的驱动程序线程转储可以清楚地表明它正在等待此任务完成:

Further more, looking at the driver thread dump under streaming-job-executor-0 clearly shows that it is waiting for this task to complete:

java.lang.Object.wait(本机方法) java.lang.Object.wait(Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612) org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:58) org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.apply $ mcV $ sp(RDD.scala:1682) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.apply(RDD.scala:1679) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1.apply(RDD.scala:1679) org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1678) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply(RDD.scala:1684) org.apache.spark.rdd.RDD $$ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply(RDD.scala:1684) scala.collection.immutable.List.foreach(List.scala:318)

java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612) org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:58) org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1682) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1679) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1679) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1678) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$1.apply(RDD.scala:1684) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$1.apply(RDD.scala:1684) scala.collection.immutable.List.foreach(List.scala:318)

有人遇到过这样的问题吗?

Has anyone experienced such an issue?

推荐答案

由于org.jets3t使用的HttpClient库中的错误(SSL握手未使用指定的超时时间),导致套接字挂起.您可以在此处找到问题详细信息.

The socket hang happens due to a bug in the HttpClient library used by org.jets3t where the SSL handshake doesn't use the specified timeout. You can find the issue details here.

此错误在v4.5.1以下的HttpClient版本中得到了修复,该版本已得到修复.不幸的是,Spark 1.6.x使用v4.3.2,该版本没有提供的修复程序.

This bug reproduces in HttpClient versions below v4.5.1, where it was fixed. Unfortunately, Spark 1.6.x uses v4.3.2, which doesn't have the supplied fix.

到目前为止,我已经想到了三种可能的解决方法:

There are three possible workaround I've thought of so far:

  1. 通过spark.speculation配置设置使用Spark的推测机制.这有助于处理挂起的边缘情况,因为挂起很少且在负载下进行.请注意,这可能会在流式作业开始时引起一些误报,在这种情况下,火花不能很好地说明您的中位数任务运行了多长时间,但这绝对不会引起明显的滞后.

  1. Use Spark's speculation mechanism via the spark.speculation configuration settings. This helps with the edge cases of the hang as it reproduces rarely and under load. Note this can cause some false positives in the beginning of the streaming job where spark doesn't have a good impression of how long running your median task is, but it is definitely not something that causes a noticeable lag.

文档说:

如果设置为"true",则执行任务的推测执行.这表示 如果一个或多个任务在一个阶段中运行缓慢,它们将是 重新启动.

If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.

您可以通过提供标记来开启火花来打开它:

You turn it on by supplying the flags to spark-submit:

spark-submit  \
--conf "spark.speculation=true" \
--conf "spark.speculation.multiplier=5" \

有关可以通过的其他设置的更多信息,请参见 Spark配置

For more on the different settings you can pass see the Spark Configuration page

手动将HttpClient v4.5.1或更高版本传递给Sparks类路径,因此它可以先加载该JAR,再加载到超级JAR中.这可能有点困难,因为使用Spark进行类加载过程有点麻烦.这意味着您可以按照以下方式进行操作:

Manually passing HttpClient v4.5.1 or above to Sparks classpath, so it can load this JAR prior to one it has in it's uber JAR. This can be a little difficult as the class loading process with Spark is a bit cumbersome. This means that you can do something along the lines of:

CP=''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done
SPARK_CLASSPATH="$CP" sbin/start-master.sh   # on your master machine
SPARK_CLASSPATH="$CP" sbin/start-slave.sh 'spark://master_name:7077' 

或者只是将JAR的特定版本更新为spark-env.sh中的SPARK_CLASSPATH.

Or simply update the specific version of the JAR to SPARK_CLASSPATH in spark-env.sh.

更新为 Spark 2.0.0 .新版本的Spark使用HttpClient v4.5.2解决了此问题.

Updating to Spark 2.0.0. The new version of Spark uses HttpClient v4.5.2 which resolves this issue.

这篇关于长时间运行后,Spark有状态流作业在指向S3的检查点处挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆