限制数据帧分区的最大大小 [英] Limiting maximum size of dataframe partition

查看:30
本文介绍了限制数据帧分区的最大大小的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我将数据帧写出到 csv 时,会为每个分区创建一个 .csv 文件.假设我想将每个文件的最大大小限制为 1 MB.我可以多次写入并增加每次重新分区的参数.有没有办法可以提前计算用于重新分区的参数以确保每个文件的最大大小小于某个指定大小.

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

我想可能会出现所有数据都集中在一个分区上的病态情况.所以做一个较弱的假设,我们只想确保平均文件大小小于某个指定的数量,比如 1 MB.

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

推荐答案

1.单数据帧方案

我试图找出一些不会同时杀死集群的聪明想法,唯一想到的是:

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  1. 计算序列化行的大小
  2. 不知道.DataFrame 中的行数
  3. 重新分区,除以预期大小
  4. 应该可以吗?

代码应该看起来更像这样:

The code should look more like this:

val df: DataFrame = ??? // your df
val rowSize = getBytes(df.head)
val rowCount = df.count()
val partitionSize = 1000000 // million bytes in MB?
val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt
df.repartition(noPartitions).write.format(...) // save to csv

// just helper function from https://stackoverflow.com/a/39371571/1549135
def getBytes(value: Any): Long = {
  val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
  val oos = new ObjectOutputStream(stream)
  oos.writeObject(value)
  oos.close
  stream.toByteArray.length
}

虽然我的第一个选择是计算每一行的字节大小,但这会非常低效.因此,除非每行中的数据大小差异很大,否则我会说此解决方案可行.您还可以计算每第 n 行的大小.你明白了.

While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

另外,我只是希望"Long 足够大以支持计算noPartitions 的预期大小.如果不是(如果你有很多行),也许改变操作顺序会更好,例如:

Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt

同样,这只是一个草拟的想法,对您的数据没有领域知识.

again this is just a drafted idea with no domain knowledge about your data.

在浏览 apache-spark 文档时 我发现了一个有趣的跨系统解决方案:

While going through the apache-spark docs I have found an interesting cross-system solution:

spark.sql.files.maxPartitionBytes其中设置:

读取文件时打包到单个分区的最大字节数.

The maximum number of bytes to pack into a single partition when reading files.

默认值为 134217728 (128 MB).

所以我想您可以将其设置为 1000000 (1MB),它会对您的 DataFrames 产生永久影响.但是,过小的分区大小可能会极大地影响您的性能!

So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

您可以在SparkSession创建期间进行设置:

You can set it up, during SparkSession creation:

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.sql.files.maxPartitionBytes", 100000)
  .getOrCreate()

<小时>

以上所有内容仅在(我没记错并且)使用与 DataFrame 分区相同数量的文件对 csv 进行分区时才有效.


All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

这篇关于限制数据帧分区的最大大小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆