加载镶木地板文件并保持相同数量的 hdfs 分区 [英] load parquet file and keep same number hdfs partitions

查看:24
本文介绍了加载镶木地板文件并保持相同数量的 hdfs 分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个镶木地板文件 /df 保存在具有 120 个分区的 hdfs 中.hdfs 上每个分区的大小约为 43.5 M.

I have a parquet file /df saved in hdfs with 120 partitions. The size of each partition on hdfs is around 43.5 M.

总尺寸

hdfs dfs -du -s -h /df
5.1 G  15.3 G  /df

hdfs dfs -du -h /df
43.6 M  130.7 M  /df/pid=0
43.5 M  130.5 M  /df/pid=1
...
43.6 M  130.9 M  /df/pid=119

我想将该文件加载到 Spark 中并保持相同数量的分区.但是,Spark 会自动将文件加载到 60 个分区中.

I want to load that file into Spark and keep the same number of partitions. However, Spark will automatically load the file into 60 partitions.

df = spark.read.parquet('df')
df.rdd.getNumPartitions()

60

<小时>

HDFS 设置:

'parquet.block.size' 未设置.

sc._jsc.hadoopConfiguration().get('parquet.block.size')

什么都不返回.

'dfs.blocksize' 设置为 128.

'dfs.blocksize' is set to 128.

float(sc._jsc.hadoopConfiguration().get("dfs.blocksize"))/2**20

返回

128

将这些值中的任何一个更改为较低的值不会导致 parquet 文件加载到 hdfs 中相同数量的分区中.

Changing either of those values to something lower does not result in the parquet file loading into the same number of partitions that are in hdfs.

例如:

sc._jsc.hadoopConfiguration().setInt("parquet.block.size", 64*2**20)
sc._jsc.hadoopConfiguration().setInt("dfs.blocksize", 64*2**20)

<小时>

我意识到 43.5 M 远低于 128 M.但是,对于这个应用程序,我将立即完成许多转换,这将导致 120 个分区中的每一个都更接近 128 M.


I realize 43.5 M is well below 128 M. However, for this application, I am going to immediately complete many transformations that will result in each of the 120 partitions getting much closer to 128 M.

我试图避免在加载后立即在应用程序中重新分区.

I am trying to save myself having to repartition in the application imeadiately after loading.

有没有办法强制 Spark 加载与存储在 hdfs 上的分区数相同的 parquet 文件?

Is there a way to force Spark to load the parquet file with the same number of partitions that are stored on the hdfs?

推荐答案

我可以使用 spark.sql.files.maxPartitionBytes 属性在导入时将分区大小保持在我想要的位置.

I can use the spark.sql.files.maxPartitionBytes property to keep the partition sizes where I want when importing.

其他配置选项文档 对于 spark.sql.files.maxPartitionBytes 属性状态:

读取文件时打包到单个分区的最大字节数.此配置仅在使用 Parquet、JSON 和 ORC 等基于文件的源时有效.

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

示例(其中 spark 是一个有效的 SparkSession):

Example (where spark is a working SparkSession):

spark.conf.set("spark.sql.files.maxPartitionBytes", 67108864) ## 64Mbi

为了控制转换过程中的分区数量,我可以设置spark.sql.shuffle.partitions,其中文档 状态:

To control the number of partitions during transformations, I can set spark.sql.shuffle.partitions, for which the documentation states:

配置混洗数据以进行连接或聚合时要使用的分区数.

Configures the number of partitions to use when shuffling data for joins or aggregations.

示例(其中 spark 是一个有效的 SparkSession):

Example (where spark is a working SparkSession):

spark.conf.set("spark.sql.shuffle.partitions", 500)

此外,我可以设置spark.default.parallelism,为此执行行为文档 指出:

Additionally, I can set spark.default.parallelism, for which the Execution Behavior documentation states:

当用户未设置时,由 join、reduceByKey 和 parallelize 等转换返回的 RDD 中的默认分区数.

Default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

示例(其中 spark 是一个有效的 SparkSession):

Example (where spark is a working SparkSession):

spark.conf.set("spark.default.parallelism", 500)

这篇关于加载镶木地板文件并保持相同数量的 hdfs 分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆