使用Spark将大量文件写入s3的最佳实践是什么 [英] What is the best practice writing massive amount of files to s3 using Spark

查看:74
本文介绍了使用Spark将大量文件写入s3的最佳实践是什么的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正尝试使用Spark将大约30k-60k的实木复合地板文件写入s3,由于s3速率限制,这花费了大量时间(40+分钟).我想知道是否有最佳实践来做这样的事情.我听说将数据写入HDFS,然后使用s3-dist-cp复制可能更快.我不明白为什么.由于s3速率限制,不是从HDFS复制所需的时间相同吗?

感谢您的帮助

解决方案

这种方法没有什么问题,并且在大多数用例中都可以正常使用,但是由于编写S3文件的方式可能会带来一些挑战

需要理解的两个重要概念

  1. S3(对象存储)!= POSIX文件系统:重命名操作:

    基于POSIX的文件系统中的文件重命名过程是仅元数据操作.只有指针更改,文件才保留在磁盘上.例如,我有一个文件abc.txt,我想将其重命名为xyz.txt,它是瞬时的和原子的.xyz.txt的最后修改的时间戳与abc.txt的最后修改的时间戳相同.与在AWS S3(对象存储)中一样,引擎盖下的文件重命名是复制,然后是删除操作.首先将源文件复制到目标位置,然后再删除源文件.因此,"aws s3 mv"更改目标文件的最后修改时间戳,这与POSIX文件系统不同.这里的元数据是一个键值存储,其中key是文件路径,而value是文件的内容,没有更改密钥并立即完成的过程.重命名过程取决于文件的大小.如果有一个目录重命名(为简单起见,在S3中没有所谓的目录,我们可以假定一个可恢复的文件集作为目录),则它取决于dir中文件的数量以及每个文件的大小.简而言之,与普通文件系统相比,S3中的重命名操作非常昂贵.

  2. S3一致性模型

    S3具有两种一致性a.写后读b.最终一致性,在某些情况下会导致文件找不到期望的文件.文件未添加或未列出,文件已删除或未从列表中删除.

深入说明:

Spark利用Hadoop的"FileOutputCommitter"实现来写入数据.再次写入数据涉及多个步骤,并在较高的阶段暂存输出文件,然后提交它们,即写入最终文件.这里,正如我之前所说的从暂存阶段到最终步骤一样,涉及重命名步骤.如您所知,spark作业分为多个任务的阶段和集合,由于分布式计算的性质,任务容易失败,因此还提供了由于系统故障或推测性执行缓慢运行的任务而重新启动同一任务的条款,从而导致任务提交和作业的概念提交函数.这里有2种可供选择的可用算法,以及如何完成作业和任务提交,并且说这不是一种算法比其他算法更好,而是基于我们在哪里提交数据.

mapreduce.fileoutputcommitter.algorithm.version = 1

  • commitTask将任务生成的数据从任务临时目录重命名为作业临时目录.

  • 所有任务完成后,commitJob将作业临时目录中的所有数据重命名到最终目标,最后创建_SUCCESS文件.

这里的驱动程序最后完成了commitJob的工作,因此由于很多任务临时文件排队等待重命名操作(虽然不是串行的)并且写入性能没有得到优化,所以像S3这样的对象存储可能需要更长的时间.对于HDFS来说,它可能工作得很好,因为重命名并不昂贵,并且只需更改元数据.对于AWS S3,在commitJob期间,文件的每个重命名操作都会打开对AWS S3的大量API调用,并且如果数量过多,可能会导致意外的API调用关闭文件很高.可能也不会.我已经看到同一工作的两个案例在两个不同的时间运行.

mapreduce.fileoutputcommitter.algorithm.version = 2

  • commitTask在任务完成后立即将任务生成的数据从任务临时目录直接移动到最终目标.

  • commitJob基本上会写入_SUCCESS文件,并且不会做很多事情.

从较高的角度看,它似乎已经过优化,但是它有一个局限性,即无法执行推测性任务,而且如果任何任务由于数据损坏而失败,那么我们最终可能会在最终目标中残留数据并需要清理.因此,这种算法无法提供100%的数据正确性,或者不适用于我们需要将数据以追加模式添加到现有文件的用例,即使这确保优化结果也存在风险.与算法1相比,重命名操作的次数更少(仍然有重命名).在这里,我们可能会遇到文件未找到期望的问题,因为commitTask将文件写入临时路径并立即重命名它们,并且极有可能出现最终一致性问题.

最佳做法

我认为我们在编写Spark数据处理应用程序时可以使用的很少:

  • 如果您有可用的HDFS群集,则将数据从Spark写入HDFS并将其复制到S3以持久保存.s3-dist-cp可以最佳地用于从HDFS到S3的数据复制.在这里,我们可以避免所有重命名操作.AWSEMR仅在计算期间运行,然后终止以保持结果,这种方法看起来更好.p>

  • 尝试避免编写文件并一次又一次地读取它,除非有文件的使用者,并且spark在内存中处理方面众所周知,谨慎的数据持久性/内存在内存中将有助于优化运行时间.该应用程序.

I'm trying to write about 30k-60k parquet files to s3 using Spark and it's taking a massive amount of time (40+ minutes) due to the s3 rate limit. I wonder if there is a best practice to do such a thing. I heard that writing the data to HDFS and then copying it using s3-dist-cp may be faster. I can't understand why. isn't the copy from HDFS will take the same amount of time because of the s3 rate limit?

Thanks for your help

解决方案

There is nothing wrong in this approach and works absolutely fine in most of the use cases, but there might be some challenges due to the way in S3 files are written.

Two Important Concepts to Understand

  1. S3(Object Store) != POSIX File System : Rename Operation:

    File rename process in POSIX based file system is a metadata only operation.Only the pointer changes and file remains as is on the disk. For example, I have a file abc.txt and I want to rename it as xyz.txt its instantaneous and atomic. xyz.txt’s last modified timestamp remain same as abc.txt’s last modfied timestamp. Where as in AWS S3 (object store) the file rename under the hood is a copy followed by a delete operation. The source file is first copied to destination and then the source file is deleted.So "aws s3 mv" changes the last modified timestamp of destination file unlike POSIX file system.The metadata here is a key value store where key is the file path and value is the content of the file and there is no such process as changing the key and get this done immediately. The rename process depends on the size of the file. If there is a directory rename(there is nothing called directory in S3 for for simplicity we can assume a recusrive set of files as a directory) then it depends on the # of files inside the dir along with size of each file. So in a nutshell rename is very expensive operation in S3 as compared to normal file system.

  2. S3 Consistency Model

    S3 comes with 2 kinds of consistency a.read after write b.eventual consistency and which some cases results in file not found expectation.Files being added and not listed or files being deleted or not removed from list.

Deep explanation:

Spark leverages Hadoop’s "FileOutputCommitter" implementations to write data. Writing data again involves multiple steps and on a high level staging output files and then committing them i.e. writing final files.Here the rename step is involved as I was talking earlier from staging to final step.As you know a spark job is divided into multiple stages and set of tasks and due to nature of distributed computing the tasks are prone to failure so there is also provision to re-launch same task due to system failure or speculative execution of slow running tasks and that leads to concepts of task commit and job commit functions.Here we have 2 options of readily available algorithms and how job and task commits are done and having said this not one algorithm is better then other rather based on where we are committing data.

mapreduce.fileoutputcommitter.algorithm.version=1

  • commitTask renames the data generated by task from task temporary directory to job temporary directory.

  • When all the tasks are complete commitJob rename all the data from job temporary directory to final destination and at the end creates _SUCCESS file.

Here driver does the work of commitJob at the end so object stores like S3 may take longer time because of lots of task temporary file being queued up for rename operation(its not serial though)and the write performance is not optimized.It might work pretty well for HDFS as rename is not expensive and just a metadata change.For AWS S3 during commitJob each rename operation of files opens up huge number of API calls to AWS S3 and might cause issues of unexpected API call closure if the number of files are high. It might not also. I have seen both the cases on the same job running in two different times.

mapreduce.fileoutputcommitter.algorithm.version=2

  • commitTask moves data generated by task from task temporary directory directly to the final destination as soon as task is complete.

  • commitJob basically writes the _SUCCESS file and doesn't do much.

From a high level this looks optimized but it comes with a limitation not to have the speculative task execution and also if any task fails due to corrupt data then we might end up with residual data in the final destination and needs a clean up. So this algorithm doesn't give 100% data correctness or doesn't work for use cases where we need data in append mode to existing files.Even if this ensures optimised results comes with a risk.The reason for good performance is basically because of less number of rename operations as compared to algorithm 1(still there are renames). Here we might encounter issues of file not found expectations because commitTask writes the file in temporary path and immediately renames them and there are light chances of eventual consistency issues.

Best Practices

Here are few I think we can use while writing spark data processing applications :

  • If you have a HDFS cluster available then write data from Spark to HDFS and copy it to S3 to persist. s3-dist-cp can be used for data copy from HDFS to S3 optimally.Here we can avoid all that rename operation.With AWS EMR being running for only duration of compute and then terminated afterwards to persist result this approach looks preferable.

  • Try avoiding writing files and reading it again and again unless there are consumers for the files , and spark is well known for in-memory processing and careful data persistence/cache in-memory will help the optimized run time of the application.

这篇关于使用Spark将大量文件写入s3的最佳实践是什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆