为什么 Pyspark 作业在没有任何特定错误的情况下在过程中消失 [英] Why Pyspark jobs are dying out in the middle of process without any particular error

查看:15
本文介绍了为什么 Pyspark 作业在没有任何特定错误的情况下在过程中消失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

专家们,我注意到生产中的 Pyspark 作业之一(在 YARN 集群模式下运行)有一件奇怪的事情.执行大约一个小时 +(大约 65-75 分钟)后,它就会消失,而不会抛出任何特定的错误消息.我们已经分析了大约 2 周的 YARN 日志,其中没有特别的错误,它只是在执行 ETL 操作(读/写 hive 表、执行简单映射、修剪、lambda 操作等)时在中间死亡,没有任何错误要指出的特定代码段.有时重新运行可以修复它,有时需要多次重新运行.代码已优化, spark-submit --conf 具有所有正确优化的选项.正如我们之前提到的,它对于大约 30 种其他具有非常好的性能统计数据的应用程序来说绝对是完美的.这些是我们拥有的所有选项 -

Experts, I am noticing one peculiar thing with one of the Pyspark jobs in production(running in YARN cluster mode). After executing for around an hour + (around 65-75 mins), it just dies out without throwing any particular error message. We have analyzed the YARN logs for around 2 weeks now and there is no particular error in them, it just dies in the middle while doing ETL operations(reading/writing hive table, doing simple maps, trim, lambda operations etc), not any particular piece of code to point out. Sometimes rerunning fixes it, sometimes it takes more than one rerun. The code is optimized, the spark-submit --conf has all the correctly optimized options. As we mentioned earlier, it is running absolutely perfect for around 30 other applications with very good performance stats. These are all the options we have -

spark-submit --conf spark.yarn.maxAppAttempts=1 --conf spark.sql.broadcastTimeout=36000 --conf spark.dynamicAllocation.executorIdleTimeout=1800 --conf spark.dynamicAllocation.minExecutors=8 --conf spark.dynamicAllocation.initialExecutors=8 --conf spark.dynamicAllocation.maxExecutors=32 --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.kryoserializer.buffer.max=512m --driver-memory 2G --executor-memory 8G --executor-cores 2 --deploy-mode cluster --master yarn

我们想检查是否需要更改某些驱动器配置以解决此问题?或者在 Spark Cluster 模式下有一些可以增加的自动超时?我们在 Python 2.7 中使用 Spark 1.6

We want to check if it is some drive configuration i need to change to address this issue? Or there is some automatic timeout in Spark Cluster mode which can be increased? we are using Spark 1.6 with Python 2.7

错误看起来像(有几条消息说-

The error looks like (there are several messages where it says -

ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM

但是遇到驱动错误就失败了(最后发生)-

But it fails when it encounters driver error (happens in the end)-

ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down

这是日志-

19/10/24 16:17:03 INFO compress.CodecPool: Got brand-new compressor [.gz]
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000323_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000323
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000323_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 323.0 in stage 152.0 (TID 27419). 2163 bytes result sent to driver
19/10/24 16:17:03 INFO output.FileOutputCommitter: Saved output of task 'attempt_201910241617_0152_m_000135_0' to hdfs://myserver/production/out/TBL/_temporary/0/task_201910241617_0152_m_000135
19/10/24 16:17:03 INFO mapred.SparkHadoopMapRedUtil: attempt_201910241617_0152_m_000135_0: Committed
19/10/24 16:17:03 INFO executor.Executor: Finished task 135.0 in stage 152.0 (TID 27387). 2163 bytes result sent to driver
19/10/24 16:18:04 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
19/10/24 16:18:04 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:18:04 INFO util.ShutdownHookManager: Shutdown hook called

19/10/24 16:21:12 INFO executor.Executor: Finished task 41.0 in stage 163.0 (TID 29954). 2210 bytes result sent to driver
19/10/24 16:21:12 INFO executor.Executor: Finished task 170.0 in stage 163.0 (TID 29986). 2210 bytes result sent to driver
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30047
19/10/24 16:21:13 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 30079
19/10/24 16:21:13 INFO executor.Executor: Running task 10.0 in stage 165.0 (TID 30047)
19/10/24 16:21:13 INFO executor.Executor: Running task 42.0 in stage 165.0 (TID 30079)
19/10/24 16:21:13 INFO spark.MapOutputTrackerWorker: Updating epoch to 56 and clearing cache
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 210
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210_piece0 stored as bytes in memory (estimated size 29.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO broadcast.TorrentBroadcast: Reading broadcast variable 210 took 3 ms
19/10/24 16:21:13 INFO storage.MemoryStore: Block broadcast_210 stored as values in memory (estimated size 83.4 KB, free 3.8 GB)
19/10/24 16:21:13 INFO executor.Executor: Finished task 10.0 in stage 165.0 (TID 30047). 931 bytes result sent to driver
19/10/24 16:21:13 INFO executor.Executor: Finished task 42.0 in stage 165.0 (TID 30079). 931 bytes result sent to driver
19/10/24 16:21:15 WARN executor.CoarseGrainedExecutorBackend: An unknown (rxxxxxx1.hadoop.com:XXXXX) driver disconnected.
19/10/24 16:21:15 ERROR executor.CoarseGrainedExecutorBackend: Driver XX.XXX.XXX.XXX:XXXXX disassociated! Shutting down.
19/10/24 16:21:15 INFO storage.DiskBlockManager: Shutdown hook called
19/10/24 16:21:15 INFO util.ShutdownHookManager: Shutdown hook called

谢谢,希德

推荐答案

如果没有任何明显的堆栈跟踪,最好从两个角度思考问题:要么是代码问题,要么是数据问题.

Without any apparent stack trace it's a good idea to think of a problem from two angles: it's either a code issue or a data issue.

无论哪种情况,您都应该首先为驱动程序提供充足的内存,以排除可能的原因.增加 driver.memorydriver.memoryOverhead 直到您诊断出问题.

Either case you should start by giving the driver abundant memory so as to rule that out as a probable cause. Increase driver.memory and driver.memoryOverhead until you've diagnosed the problem.

常见代码问题:

  1. 太多的转换会导致血统变得太大.如果数据帧上发生了任何类型的迭代操作,那么最好通过在其间执行 checkpoint 来截断 DAG.在 Spark 2.x 中,您可以直接调用 dataFrame.checkpoint() 而不必访问 RDD.@Sagar 的回答还描述了如何为 Spark 1.6 执行此操作

  1. Too many transformations causes the lineage to get too big. If there's any kind of iterative operations happening on the dataframe then it's a good idea to truncate the DAG by doing a checkpoint in between. In Spark 2.x you can call dataFrame.checkpoint() directly and not have to access the RDD. Also @Sagar's answer describes how to do this for Spark 1.6

试图广播太大的数据帧.这通常会导致 OOM 异常,但有时只会导致作业看起来卡住了.如果您明确这样做,解决方案是不调用 broadcast.否则检查您是否已将 spark.sql.autoBroadcastJoinThreshold 设置为某个自定义值,然后尝试降低该值或完全禁用广播(设置 -1).

Trying to broadcast dataframes that are too big. This will usually result in an OOM exception but can sometimes just cause the job to seem stuck. Resolution is to not call broadcast if you are explicitly doing so. Otherwise check if you've set spark.sql.autoBroadcastJoinThreshold to some custom value and try lowering that value or disable broadcast altogether (setting -1).

没有足够的分区会导致每个任务都运行得很热.诊断此问题的最简单方法是检查 Spark UI 上的阶段视图并查看每个任务读取和写入的数据大小.理想情况下,这应该在 100MB-500MB 范围内.否则将 spark.sql.shuffle.partitionsspark.default.parallelism 增加到比默认值 200 更高的值.

Not enough partitions can cause every task to run hot. Easiest way to diagnose this is to check the stages view on the Spark UI and see the size of data being read and written per task. This should ideally be in 100MB-500MB range. Otherwise increase spark.sql.shuffle.partitions and spark.default.parallelism to higher values than the default 200.

常见数据问题:

  1. 数据倾斜.由于您的作业在特定工作负载下失败,因此特定作业中的数据可能会出现偏差.通过检查任务完成的中位时间是否与 75 个百分点相当,而后者与 Spark UI 中舞台视图上的 90 个百分点相当,以此来诊断这一点.纠正数据偏差的方法有很多,但我认为最好的方法是编写一个自定义的连接函数,在连接之前对连接键进行加盐处理.这以不变大小的数据爆炸为代价将偏斜分区拆分为几个较小的分区.

  1. Data skew. Since your job is failing for a specific workload it could have data skew in the specific job. Diagnose this by checking that the median time for task completion is comparable to the 75 percentile which is comparable to the 90 percentile on the stage view in the Spark UI. There are many ways to redress data skew but the one I find best is to write a custom join function that salts the join keys prior to join. This splits the skewed partition into several smaller partitions at the expense of a constant size data explosion.

输入文件格式或文件数.如果您的输入文件未分区并且您只进行窄变换(那些不会导致数据混洗的变换),那么您的所有数据都将通过单个执行程序运行,而不会真正从分布式集群设置中受益.通过检查在管道的每个阶段创建了多少任务,从 Spark UI 诊断此问题.它应该是您的 spark.default.parallelism 值的顺序.如果不是,则在任何转换之前的数据读取步骤之后立即执行 .repartition().如果文件格式为 CSV(不理想),则确认您已禁用 multiLine 除非在您的特定情况下需要,否则这会强制单个执行程序读取整个 csv 文件.

Input file format or number of files. If your input file isn't partitioned and you're only doing narrow transforms (those that do not cause a data shuffle) then all of your data will run through a single executor and not really benefit from the distributed cluster setup. Diagnose this from the Spark UI by checking how many tasks are getting created in each stage of the pipeline. It should be of the order of your spark.default.parallelism value. If not then do a .repartition(<some value>) immediately after the data read step prior to any transforms. If the file format is CSV (not ideal) then verify that you have multiLine disabled unless required in your specific case, otherwise this forces a single executor to read the entire csv file.

调试愉快!

这篇关于为什么 Pyspark 作业在没有任何特定错误的情况下在过程中消失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆