限制数据帧分区的最大大小 [英] Limiting maximum size of dataframe partition

查看:95
本文介绍了限制数据帧分区的最大大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我将数据帧写到csv时,将为每个分区创建一个.csv文件.假设我想将每个文件的最大大小限制为1 MB.我可以多次写入,并增加每次重新分区的参数.有没有一种方法可以提前计算要用于分区的参数,以确保每个文件的最大大小小于某些指定大小.

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

我想可能会出现病理情况,所有数据最终都集中在一个分区上.因此,请采用较弱的假设,即我们仅要确保平均文件大小小于某个指定的数量(例如1 MB).

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

推荐答案

1.单一数据框解决方案

我试图找出一个不会同时杀死集群的聪明主意,唯一想到的是:

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  1. 计算序列化行的大小
  2. 获取否.DataFrame中的行数
  3. 分区,除以预期大小
  4. 应该工作吗?

代码应更像这样:

val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv

// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
  val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  val oos = new ObjectOutputStream(stream)
  oos.writeObject(value)
  oos.close
  stream.toByteArray.length
}

虽然我的第一选择是计算每一行的字节大小,但这将是非常低效的.因此,除非每行中的数据大小差异很大,否则我会说此解决方案将起作用.您还可以计算每第n行的大小.你明白了.

While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

此外,我只是希望" Long 大到足以支持预期的大小来计算 noPartitions .如果没有(如果您有很多行),也许最好更改操作顺序,例如:

Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt

同样,这只是起草的想法,没有关于您的数据的领域知识.

again this is just a drafted idea with no domain knowledge about your data.

浏览 apache-spark文档时我找到了一个有趣的跨系统解决方案:

While going through the apache-spark docs I have found an interesting cross-system solution:

spark.sql.files.maxPartitionBytes 设置为:

读取文件时打包到单个分区中的最大字节数.

The maximum number of bytes to pack into a single partition when reading files.

默认值为 134217728(128 MB).

所以我想您可以将其设置为 1000000(1MB),它将对您的 DataFrames 产生永久性影响.但是,分区大小过小可能会严重影响您的性能!

So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

您可以在 SparkSession 创建期间进行设置:

You can set it up, during SparkSession creation:

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.files.maxPartitionBytes", 100000)
  .getOrCreate()


以上所有条件仅在(我没记错并且)将csv分区的文件数与DataFrame的分区数相同时有效.


All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

这篇关于限制数据帧分区的最大大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆