多个Spark作业通过分区将镶木地板数据附加到同一基本路径 [英] Multiple spark jobs appending parquet data to same base path with partitioning

查看:57
本文介绍了多个Spark作业通过分区将镶木地板数据附加到同一基本路径的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个要并行执行的作业,这些作业使用分区将每日数据追加到同一路径中.

I have multiple jobs that I want to execute in parallel that append daily data into the same path using partitioning.

例如

dataFrame.write().
         partitionBy("eventDate", "category")
            .mode(Append)
            .parquet("s3://bucket/save/path");

工作1-类别="billing_events" 职位2-category ="click_events"

Job 1 - category = "billing_events" Job 2 - category = "click_events"

这两个作业都将在执行之前截断s3存储桶中存在的所有现有分区,然后将生成的Parquet文件保存到各自的分区中.

Both of these jobs will truncate any existing partitions that exist in the s3 bucket prior to execution and then save the resulting parquet files to their respective partitions.

工作1-> s3://bucket/save/path/eventDate = 20160101/channel = billing_events

job 1 - > s3://bucket/save/path/eventDate=20160101/channel=billing_events

作业2-> s3://bucket/save/path/eventDate = 20160101/channel = click_events

job 2 - > s3://bucket/save/path/eventDate=20160101/channel=click_events

我面临的问题是在作业执行期间由spark创建的临时文件.它将锻炼文件保存到基本路径

The problem im facing is the temporary files that get created during the job execution by spark. It saves the working out files to the base path

s3://bucket/save/path/_temporary/...

s3://bucket/save/path/_temporary/...

,因此两个作业最终共享同一个临时文件夹并引起冲突,我注意到,该作业可能导致一个作业删除临时文件,而另一个作业因s3中的404错误而失败,表示预期的临时文件不存在.

so both jobs end up sharing the same temp folder and cause conflict, which ive noticed can cause one job to delete temp files, and the other job fail with a 404 from s3 saying an expected temp file doesnt exist.

有人遇到这个问题并提出了在同一基本路径中并行执行作业的策略吗?

Has anyone faced this issue and come up with a strategy to have parallel execution of jobs in the same base path?

我暂时使用spark 1.6.0

im using spark 1.6.0 for now

推荐答案

因此,在阅读了许多有关如何解决此问题的文章后,我认为id可以将一些智慧带回这里以解决问题.多亏了塔尔(Tal)的评论.

So after much reading about how to tackle this problem I thought id transfer some wisdom back here to wrap things up. Thanks mostly to Tal's comments.

我还发现直接写到s3://bucket/save/path似乎很危险,因为如果一项工作被杀死,并且在工作结束时没有进行临时文件夹的清理,离开那里进行下一个工作,我注意到有时以前被杀死的工作temp的文件位于s3://bucket/save/path中,并导致重复...完全不可靠...

I've additionally found that writing directly to the s3://bucket/save/path seems dangerous because if a job is killed and the cleanup of the temporary folder doesnt happen at the end of the job, it seems like its left there for the next job and i've noticed sometimes the previous killed jobs temp's files land in the s3://bucket/save/path and causes duplication... Totally unreliable...

此外,将_temporary文件夹文件重命名为相应的s3文件的操作需要花费大量时间(每个文件大约1秒),因为S3仅支持复制/删除而不重命名.此外,仅驱动程序实例使用单个线程来重命名这些文件,因此在等待大量重命名文件/分区的作业中,有多达1/5的时间仅用于等待重命名操作.

Additionally, the rename operation of the _temporary folder files to their appropriate s3 files, takes a horrendous amount of time (approx 1 sec per file) as S3 only supports copy/delete not rename. Additionally, only the driver instance renames these files using a single thread so as much as 1/5 of some jobs with large numbers of files/partitions are spent just waiting for rename operations.

出于多种原因,我已经排除了使用DirectOutputCommitter的可能性.

I've ruled out using the DirectOutputCommitter for a number of reasons.

  1. 与推测模式结合使用会导致重复( https://issues. apache.org/jira/browse/SPARK-9899 )
  2. 任务失败将导致混乱,以后将无法找到并删除/清理.
  3. Spark 2.0已完全删除对此功能的支持,并且不存在升级路径.( https://issues.apache.org/jira/browse/SPARK-10063 )
  1. When used in conjunction with speculation mode it results in duplication (https://issues.apache.org/jira/browse/SPARK-9899)
  2. Task failures will leave clutter which would be impossible to find and remove/clean later.
  3. Spark 2.0 has removed support for this completely and no upgrade path exists.(https://issues.apache.org/jira/browse/SPARK-10063)

执行这些作业的唯一安全,高效且一致的方法是,首先将它们保存到hdfs中的唯一临时文件夹(由applicationId或时间戳唯一).并在作业完成时复制到S3.

The only safe, performant, and consistent way to execute these jobs is to save them to unique temporary folder (unique by applicationId or timestamp) in hdfs first. And copy to S3 on job completion.

这允许并发作业执行,因为它们将保存到唯一的临时文件夹中,而无需使用DirectOutputCommitter,因为HDFS上的重命名操作比S3更快,并且保存的数据更加一致.

This allows concurrent jobs to execute as they will save to unique temp folders, no need to use the DirectOutputCommitter as the rename operation on HDFS is quicker than S3, and the saved data is more consistent.

这篇关于多个Spark作业通过分区将镶木地板数据附加到同一基本路径的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆