Spark parquet 分区:大量文件 [英] Spark parquet partitioning : Large number of files

查看:66
本文介绍了Spark parquet 分区:大量文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试利用火花分区.我正在尝试做类似的事情

I am trying to leverage spark partitioning. I was trying to do something like

data.write.partitionBy("key").parquet("/location")

这里的问题是每个分区都会创建大量的镶木地板文件,如果我尝试从根目录读取,会导致读取速度变慢.

The issue here each partition creates huge number of parquet files which result slow read if I am trying to read from the root directory.

为了避免我尝试过

data.coalese(numPart).write.partitionBy("key").parquet("/location")

然而,这会在每个分区中创建 numPart 数量的镶木地板文件.现在我的分区大小不同了.所以我理想情况下希望每个分区都有单独的合并.然而,这看起来并不是一件容易的事情.我需要访问所有分区合并到一定数量并存储在单独的位置.

This however creates numPart number of parquet files in each partition. Now my partition size is different. SO I would ideally like to have separate coalesce per partition. This is however doesn't look like an easy thing. I need to visit all the partition coalesce to a certain number and store at a separate location.

我应该如何使用分区来避免写入后出现多个文件?

How should I use partitioning to avoid many files after write?

推荐答案

首先我真的会避免使用 coalesce,因为这通常会在转换链中被进一步推高,并且可能会破坏并行性你的工作(我在这里问过这个问题:Coalesce 减少了整个阶段(火花)的并行度)

First I would really avoid using coalesce, as this is often pushed up further in the chain of transformation and may destroy the parallelism of your job (I asked about this issue here : Coalesce reduces parallelism of entire stage (spark))

为每个镶木地板分区编写 1 个文件非常容易(参见 写入许多小文件的 Spark 数据帧写入方法):

Writing 1 file per parquet-partition is realtively easy (see Spark dataframe write method writing many small files):

data.repartition($"key").write.partitionBy("key").parquet("/location")

如果您想设置任意数量的文件(或具有相同大小的文件),您需要使用另一个可以使用的属性进一步重新分区您的数据(我无法告诉您这可能是什么情况)):

If you want to set an arbitrary number of files (or files which have all the same size), you need to further repartition your data using another attribute which could be used (I cannot tell you what this might be in your case):

data.repartition($"key",$"another_key").write.partitionBy("key").parquet("/location")

another_key 可以是数据集的另一个属性,也可以是对现有属性使用一些模运算或舍入运算的派生属性.你甚至可以使用带有 row_number 的窗口函数,而不是 key,然后用类似

another_key could be another attribute of your dataset, or a derived attribute using some modulo or rounding-operations on existing attributes. You could even use window-functions with row_number over key and then round this by something like

data.repartition($"key",floor($"row_number"/N)*N).write.partitionBy("key").parquet("/location")

这会将您 N 条记录放入 1 个镶木地板文件

This would put you N records into 1 parquet file

使用 orderBy

您还可以通过相应地对数据框进行排序来控制文件数量,而无需重新分区:

You can also control the number of files without repartitioning by ordering your dataframe accordingly:

data.orderBy($"key").write.partitionBy("key").parquet("/location")

这将导致所有分区(默认为 200)总共(至少,但不会多于)spark.sql.shuffle.partitions 文件.在 $key 之后添加第二个排序列甚至是有益的,因为 parquet 会记住数据帧的排序并相应地写入统计信息.例如,您可以按 ID 订购:

This will lead to a total of (at least, but not much more than) spark.sql.shuffle.partitions files across all partitions (by default 200). It's even beneficial to add a second ordering column after $key, as parquet will remember the ordering of the dataframe and will write the statistics accordingly. For example, you can order by an ID:

data.orderBy($"key",$"id").write.partitionBy("key").parquet("/location")

这不会改变文件的数量,但是当您为给定的 keyid 查询镶木地板文件时,它会提高性能.见例如https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guidehttps://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

This will not change the number of files, but it will improve the performance when you query your parquet file for a given key and id. See e.g. https://www.slideshare.net/RyanBlue3/parquet-performance-tuning-the-missing-guide and https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

Spark 2.2+

从 Spark 2.2 开始,您还可以使用新选项 maxRecordsPerFile 来限制每个文件的记录数如果文件太大.如果你有 N 个分区,你仍然至少会得到 N 个文件,但是你可以将 1 个分区(任务)写入的文件拆分成更小的块:

From Spark 2.2 on, you can also play with the new option maxRecordsPerFile to limit the number of records per file if you have too large files. You will still get at least N files if you have N partitions, but you can split the file written by 1 partition (task) into smaller chunks:

df.write
.option("maxRecordsPerFile", 10000)
...

参见例如http://www.gatorsmile.io/anticipated-feature-in-spark-2-2-max-records-written-per-file/用少于 N 个分区的 N 个文件写入磁盘

这篇关于Spark parquet 分区:大量文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆