在写入s3时在Spark 2.1.0中设置spark.speculation [英] Setting spark.speculation in Spark 2.1.0 while writing to s3

查看:75
本文介绍了在写入s3时在Spark 2.1.0中设置spark.speculation的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行一个大型Spark 2.1.0,最终将结果写入s3. 它在30个节点的群集上运行,并且在大多数情况下都可以正常工作.但是,有时我必须停止工作并再次运行它,因为即使在完成所有计算之后,写入时单个节点也会卡住.我想知道我是否可以通过打开推测来缓解此问题.我在另一篇文章中读到,这可能有害并导致重复结果或数据损坏.有人可以建议吗?还建议通过在spark-defaults.conf中指定以下设置来使用hadoop默认提交程序.我正在独立运行Spark.

I am running a large Spark 2.1.0 that ends with writing results to s3. It runs on a 30 node cluster and for the most part works fine. However, occasionally I have to stop the job and run it again because a single node gets stuck while writing even after all the computation is done. I am wondering whether I can mitigate this issue by turning speculation on. I read in another post that this may be harmful and lead to duplicate results or data corruption. Can anyone advise ? I was also advised to use the hadoop default committer by specifying the following setting in my spark-defaults.conf. I am running Spark standalone.

 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

任何对此问题的澄清将不胜感激.

Any clarification on this issue would be greatly appreciated.

推荐答案

更新: 如果您使用AWS Elastic MapReduce,则版本> = 5.19的集群现在可以安全地使用推测性执行,但您的Spark作业仍可能在途中失败并留下不完整的结果.

Update: If you use AWS Elastic MapReduce, clusters with version >= 5.19 can now safely use speculative execution, but your Spark job can still fail part-way through and leave incomplete results.

如果您直接扫描AWS S3,则可以查询来自不完整结果的部分数据,这可能会导致下游作业的结果不正确,因此您需要一种策略来应对!

The partial data from your incomplete results are queryable if you are directly scanning AWS S3 which can lead to incorrect results for downstream jobs, so you need a strategy to deal with that!

如果您运行的是Spark 2.3.0或更高版本,建议您使用SaveMode.Overwrite将新分区写入确定的位置并重试失败,这样可以避免输出中的数据重复或损坏.

If your are running Spark 2.3.0 or greater I would recommend writing new partitions to a deterministic location using SaveMode.Overwrite and retrying on failure, this will avoid duplicate or corrupt data in your output.

如果您使用的是SaveMode.Append,则重试Spark作业将在输出中产生重复的数据.

If you are using SaveMode.Append then retrying a Spark job will produce duplicate data in your output.

推荐的方法:

df.write
  .mode(SaveMode.Overwrite)
  .partitionBy("date")
  .parquet("s3://myBucket/path/to/table.parquet")

然后在成功写入分区后,将其原子地注册到诸如Hive之类的元存储中,并查询Hive作为您的真实来源,而不是直接使用S3.

Then on successful writing of a partition, atomically register it to a metastore such as Hive, and query Hive as your source of truth, not S3 directly.

例如.

ALTER TABLE my_table ADD PARTITION (date='2019-01-01') location 's3://myBucket/path/to/table.parquet/date=2019-01-01'

如果您的Spark作业失败并且您正在使用SaveMode.Overwrite,那么重试始终是安全的,因为该数据尚未提供给Metastore查询,并且您仅覆盖失败分区中的数据.

If your Spark job fails and you are using SaveMode.Overwrite it is then it is always safe to retry because the data has not been made available to metastore queries, and you are only overwriting data in the failed partition.

注意:为了只覆盖特定分区而不是整个数据集,您需要配置:

Note: In order to only override specific partitions rather than the entire dataset you need to configure:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

仅在Spark 2.3.0中可用.

which is only available from Spark 2.3.0.

https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the- emrfs-s3-optimized-committer/ https://docs.aws. amazon.com/emr/latest/ReleaseGuide/emr-spark-s3-optimized-committer.html

您还可能希望将Iceberg项目视为Hive/Glue元存储库的替代方案,因为它已经成熟. https://github.com/apache/incubator-iceberg

You might also want to consider the Iceberg project as an alternative to a Hive / Glue metastore as it matures. https://github.com/apache/incubator-iceberg

为何需要此条件以及对于非AWS用户而言

根据提交给下游对象的数据和您的一致性模型,在提交对象存储时运行Spark推测通常是 VERY 的坏主意.

Running with Spark speculation on when committing to an object store is usually a VERY bad idea, depending on what is looking at that data downstream and your consistency model.

来自Netflix的瑞安·布鲁(Ryan Blue)进行了精彩的演讲(非常有趣),这完全解释了原因: https ://www.youtube.com/watch?v = BgHrff5yAQo

Ryan Blue from Netflix has an excellent (and pretty funny) talk which explains exactly why: https://www.youtube.com/watch?v=BgHrff5yAQo

从OP的描述来看,我怀疑他们在写Parquet.

Judging by the OP's description I suspect they are writing Parquet.

TL; dr版本是在S3中,重命名操作实际上是在后台进行的复制和删除,这具有一致性含义.通常在Spark中,将输出数据写入临时文件位置,并在计算完成后重命名.这意味着如果启用了推测性执行,则多个执行器可以处理相同的结果,然后通过将临时文件重命名为最终结果来完成第一个获胜"的执行器,并取消另一个任务.这种重命名操作发生在单个任务上,以确保只有一个推测性任务获胜,这在HDFS上不是问题,因为重命名是一种廉价的元数据操作,几千或一百万只花费很少的时间.

The TL;dr version is that in S3, a rename operation is actually a copy and delete under the hood and this has consistency implications. Usually in Spark, output data is written to a temp file location and renamed when the calculation is complete. This means if speculative execution is on then multiple executors can be working on the same result and then the one that finishes first 'wins' by renaming temp file to a final result and the other task is cancelled. This rename operation happens on a single task to ensure that only one speculative task wins, which is not a problem on HDFS since a rename is a cheap metadata operation, a few thousand or million of them takes very little time.

但是当使用S3时,重命名不是原子操作,实际上是需要时间的副本.因此,您将陷入一种情况,您必须第二次在S3中复制一整堆文件才能进行重命名,而这是一个同步操作,导致您的速度变慢.如果执行程序具有多个核心,则实际上您可能让一个任务掩盖了另一个任务的结果,这在理论上是可以的,因为一个文件最终胜出,但您无法控制此时的情况.

But when using S3, a rename is not an atomic operation, it is actually a copy which takes time. Therefore you can get into a situation whereby you have to copy a whole bunch of files in S3 a second time for the rename, in series, and this is a synchronous operation which is causing your slowdown. If your executor has multiple cores, you may actually have one task clobber the results of another, which should be ok in theory because one file ends up winning, but you're not in control of what is happening at that point.

问题是,如果最后的重命名任务失败,会发生什么?您最终只能将一些文件提交给S3,而不能全部提交给S3,这意味着部分/重复的数据以及许多下游问题,具体取决于您的应用程序.

The issue with this is, what happens if the final rename task fails? You end up with some of your files committed to S3 and not all of them, which means partial/duplicate data and lots of problems downstream depending on your application.

虽然我不喜欢它,但目前流行的智慧是在本地写入HDFS,然后使用S3Distcp之类的工具上载数据.

While I don't like it, the prevailing wisdom presently is to write locally to HDFS, then upload the data with a tool like S3Distcp.

看看HADOOP-13786. 史蒂夫·拉夫兰(Steve Loughran)是解决这个问题的人.

Have a look at HADOOP-13786. Steve Loughran is the go to guy for this issue.

如果不想等待,Ryan Blue提供了一个"rdblue/s3committer"存储库,可用于修复除镶木地板文件以外的所有输出,但是正确集成和子类化似乎需要一些工作.

If you don't want to wait Ryan Blue has a repo "rdblue/s3committer" which allows you to fix this for all outputs except parquet files, but it looks like a bit of work to integrate and subclass correctly.

更新: HADOOP-13786现在已修复并发布到Hadoop 3.1库中. 目前,史蒂文·拉夫兰(Steven Loughran)正在研究将基于Hadoop 3.1库的修补程序合并到apache/spark(SPARK-23977)中的问题,但是根据票证注释线程的最新情况,该修补程序将在Spark 2.4发布之前不会被合并.需要等待更长的时间才能成为主流.

Update: HADOOP-13786 has now been fixed and released into Hadoop 3.1 libraries. At present Steven Loughran is working on getting a fix based on Hadoop 3.1 libs merged into apache/spark, (SPARK-23977) however latest according to the ticket comment thread is that the fix will not be merged before Spark 2.4 is released so we may be waiting a bit longer for this to become mainstream.

更新v2: 注意:通过将Hadoop配置中的mapreduce.fileoutputcommitter.algorithm.version设置为2,您可以将最终输出分区重命名任务失败的时间减半,因为原始输出提交机制实际上执行了两次重命名.

Update v2: Note: You can halve the window of time in which the final output partition rename task may fail by setting mapreduce.fileoutputcommitter.algorithm.version to 2 in your Hadoop config, since the original output commit mechanism actually performed two renames.

这篇关于在写入s3时在Spark 2.1.0中设置spark.speculation的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆