DataFrame partitionBy 到单个 Parquet 文件(每个分区) [英] DataFrame partitionBy to a single Parquet file (per partition)

查看:30
本文介绍了DataFrame partitionBy 到单个 Parquet 文件(每个分区)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想重新分区/合并我的数据,以便将其保存到每个分区的一个 Parquet 文件中.我还想使用 Spark SQL partitionBy API.所以我可以这样做:

I would like to repartition / coalesce my data so that it is saved into one Parquet file per partition. I would also like to use the Spark SQL partitionBy API. So I could do that like this:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

我对此进行了测试,但似乎性能不佳.这是因为数据集中只有一个分区可以处理,所有文件的分区、压缩和保存都必须由一个 CPU 内核完成.

I've tested this and it doesn't seem to perform well. This is because there is only one partition to work on in the dataset and all the partitioning, compression and saving of files has to be done by one CPU core.

在调用coalesce之前,我可以重写它以手动进行分区(例如,使用具有不同分区值的过滤器).

I could rewrite this to do the partitioning manually (using filter with the distinct partition values for example) before calling coalesce.

但是使用标准 Spark SQL API 有没有更好的方法来做到这一点?

But is there a better way to do this using the standard Spark SQL API?

推荐答案

我遇到了完全相同的问题,我找到了使用 DataFrame.repartition() 解决此问题的方法.使用 coalesce(1) 的问题是你的并行度下降到 1,它可能会很慢,最坏的情况是出错.增加这个数字也无济于事——如果你执行 coalesce(10) 你会获得更多的并行性,但最终每个分区有 10 个文件.

I had the exact same problem and I found a way to do this using DataFrame.repartition(). The problem with using coalesce(1) is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10) you get more parallelism, but end up with 10 files per partition.

要在不使用 coalesce() 的情况下为每个分区获取一个文件,请使用 repartition() 和您希望对输出进行分区的相同列.因此,在您的情况下,请执行以下操作:

To get one file per partition without using coalesce(), use repartition() with the same columns you want the output to be partitioned by. So in your case, do this:

import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

一旦我这样做了,我就会为每个输出分区得到一个镶木地板文件,而不是多个文件.

Once I do that I get one parquet file per output partition, instead of multiple files.

我在 Python 中对此进行了测试,但我认为在 Scala 中它应该是相同的.

I tested this in Python, but I assume in Scala it should be the same.

这篇关于DataFrame partitionBy 到单个 Parquet 文件(每个分区)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆