DataFrame分区按单个Parquet文件(每个分区) [英] DataFrame partitionBy to a single Parquet file (per partition)

查看:369
本文介绍了DataFrame分区按单个Parquet文件(每个分区)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想重新分区/合并我的数据,以便将其保存到每个分区的一个Parquet文件中.我还想使用Spark SQL partitionBy API.所以我可以这样:

I would like to repartition / coalesce my data so that it is saved into one Parquet file per partition. I would also like to use the Spark SQL partitionBy API. So I could do that like this:

df.coalesce(1)
    .write
    .partitionBy("entity", "year", "month", "day", "status")
    .mode(SaveMode.Append)
    .parquet(s"$location")

我已经对此进行了测试,但效果似乎不太理想.这是因为数据集中只有一个分区可以处理,并且所有分区,文件压缩和保存都必须由一个CPU内核完成.

I've tested this and it doesn't seem to perform well. This is because there is only one partition to work on in the dataset and all the partitioning, compression and saving of files has to be done by one CPU core.

在调用合并之前,我可以重写此代码以手动进行分区(例如,使用具有不同分区值的过滤器).

I could rewrite this to do the partitioning manually (using filter with the distinct partition values for example) before calling coalesce.

但是,使用标准的Spark SQL API是否有更好的方法呢?

But is there a better way to do this using the standard Spark SQL API?

推荐答案

我遇到了完全相同的问题,并且找到了一种使用DataFrame.repartition()进行此操作的方法.使用coalesce(1)的问题是您的并行度下降到1,它的最佳状态可能很慢,而最坏的情况下可能出错.增加该数目也无济于事-如果您执行coalesce(10),则将获得更多的并行性,但最终每个分区有10个文件.

I had the exact same problem and I found a way to do this using DataFrame.repartition(). The problem with using coalesce(1) is that your parallelism drops to 1, and it can be slow at best and error out at worst. Increasing that number doesn't help either -- if you do coalesce(10) you get more parallelism, but end up with 10 files per partition.

要在不使用coalesce()的情况下为每个分区获取一个文件,请对要对输出进行分区的相同列使用repartition().因此,您可以这样做:

To get one file per partition without using coalesce(), use repartition() with the same columns you want the output to be partitioned by. So in your case, do this:

import spark.implicits._
df.repartition($"entity", $"year", $"month", $"day", $"status").write.partitionBy("entity", "year", "month", "day", "status").mode(SaveMode.Append).parquet(s"$location")

一旦执行此操作,每个输出分区将得到一个实木复合地板文件,而不是多个文件.

Once I do that I get one parquet file per output partition, instead of multiple files.

我在Python中对此进行了测试,但是我认为在Scala中应该是相同的.

I tested this in Python, but I assume in Scala it should be the same.

这篇关于DataFrame分区按单个Parquet文件(每个分区)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆