为什么在使用saveAsTextFile的同时,在Google Dataproc中运行的Spark会将临时文件存储在外部存储(GCS)上,而不是本地磁盘或HDFS上? [英] Why does Spark running in Google Dataproc store temporary files on external storage (GCS) instead of local disk or HDFS while using saveAsTextFile?

查看:170
本文介绍了为什么在使用saveAsTextFile的同时,在Google Dataproc中运行的Spark会将临时文件存储在外部存储(GCS)上,而不是本地磁盘或HDFS上?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经运行了以下PySpark代码:

I have run the following PySpark code:

from pyspark import SparkContext

sc = SparkContext()

data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.saveAsTextFile(
    'gs://bucket-name/output_blob_path',
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)

工作成功完成.但是,在作业执行期间,Spark在以下路径gs://bucket-name/output_blob_path/_temporary/0/中创建了许多临时blob.我意识到,最后删除所有这些临时Blob占用了一半的作业执行时间,并且这段时间内CPU利用率仅为1%(资源的巨大浪费).

Job finished successfully. However, during the job execution Spark created many temporary blobs in the following path gs://bucket-name/output_blob_path/_temporary/0/. I realised that removing of all these temporary blobs at the end took half of the job execution time and CPU utilisation was on 1% during this time (huge waste of resources).

是否可以将临时文件而不是GCP存储在本地驱动器(或HDFS)上?我仍然想将最终结果(排序后的数据集)保留到GCP.

Is there a way to store temporary files on local drive (or HDFS) instead of GCP? I would still like to persist final results (sorted dataset) to GCP.

我们正在使用具有10个工作节点的Dataproc Spark集群(VM类型为16cores,60GM的VM).输入数据量为10TB.

We were using Dataproc Spark cluster (VM type 16cores, 60GM) with 10 worker nodes. The volume of the input data was 10TB.

推荐答案

您看到的_temporary文件可能是

The _temporary files you see are likely an artifact of the FileOutputCommitter being used under the hood. Importantly, these temporary blobs were not strictly "temporary" data, but were in fact completed output data which only gets "renamed" to the final destination on job completion. The "commit" of these files through rename is actually fast because both the source and destination are on GCS; for this reason there's no way to replace that part of the workflow with placing temporary files on HDFS and then "committing" into GCS, because then the commit would require re-plumbing the entire output dataset back out from HDFS into GCS. And specifically, the underlying Hadoop FileOutputFormat classes don't support such an idiom.

GCS本身不是真正的文件系统,而是对象存储",Dataproc内部的GCS连接器仅尽其所能模仿HDFS.结果是,删除文件的目录填充实际上需要GCS删除幕后的单个对象,而不是真正的文件系统,而不仅仅是断开索引节点的链接.

GCS itself is not a real filesystem, but is an "object store", and the GCS connector inside Dataproc only mimics HDFS to the best of its ability. One consequence is that the deletion of a directory fill of files actually requires GCS to delete individual objects under the hood, rather than a real filesystem just unlinking an inode.

在实践中,如果您遇到此问题,则可能意味着您的输出仍然会拆分成太多文件,因为清除操作的确一次完成了约1000个文件的批处理.因此,通常不应该使成千上万个输出文件变慢.文件过多也会使以后对这些文件的处理变慢.最简单的解决方法通常是尽可能地减少输出文件的数量,例如使用repartition():

In practice, if you're hitting this it probably means that your output is split into too many files anyways, since cleanup does occur in batches of ~1000 files at a time. So up to tens of thousands of output files usually shouldn't be noticeably slow. Having too many files would also make future work on those files slower. The easiest fix usually is just to reduce the number of output files whenever possible, for example using repartition():

from pyspark import SparkContext

sc = SparkContext()

data = sc.textFile('gs://bucket-name/input_blob_path')
sorted_data = data.sortBy(lambda x: sort_criteria(x))
sorted_data.repartition(1000).saveAsTextFile(
    'gs://bucket-name/output_blob_path',
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec"
)

这篇关于为什么在使用saveAsTextFile的同时,在Google Dataproc中运行的Spark会将临时文件存储在外部存储(GCS)上,而不是本地磁盘或HDFS上?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆