计算 Spark 数据帧的大小 - SizeEstimator 给出了意想不到的结果 [英] Compute size of Spark dataframe - SizeEstimator gives unexpected results
问题描述
我试图找到一种可靠的方法来以编程方式计算 Spark 数据帧的大小(以字节为单位).
I am trying to find a reliable way to compute the size (in bytes) of a Spark dataframe programmatically.
原因是我想要一种方法来计算最佳"分区数(最佳"在这里可能意味着不同的东西:它可能意味着 具有最佳分区大小,或在写入 Parquet 表时产生最佳文件大小 - 但两者都可以假设为某种线性函数数据帧大小).换句话说,我想在数据帧上调用 coalesce(n)
或 repartition(n)
,其中 n
不是固定数字而是数据帧大小的函数.
The reason is that I would like to have a method to compute an "optimal" number of partitions ("optimal" could mean different things here: it could mean having an optimal partition size, or resulting in an optimal file size when writing to Parquet tables - but both can be assumed to be some linear function of the dataframe size). In other words, I would like to call coalesce(n)
or repartition(n)
on the dataframe, where n
is not a fixed number but rather a function of the dataframe size.
关于 SO 的其他主题建议使用 org.apache.spark.util
中的 SizeEstimator.estimate
来获取数据帧的字节大小,但结果是获取不一致.
Other topics on SO suggest using SizeEstimator.estimate
from org.apache.spark.util
to get the size in bytes of the dataframe, but the results I'm getting are inconsistent.
首先,我将数据帧持久化到内存中:
First of all, I'm persisting my dataframe to memory:
df.cache().count
Spark UI 在存储"选项卡中显示 4.8GB 的大小.然后,我运行以下命令从 SizeEstimator
获取大小:
The Spark UI shows a size of 4.8GB in the Storage tab. Then, I run the following command to get the size from SizeEstimator
:
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(df)
这给出了 115'715'808 字节 =~ 116MB 的结果.但是,将 SizeEstimator
应用于不同的对象会导致非常不同的结果.例如,我尝试分别计算数据帧中每一行的大小并将它们相加:
This gives a result of 115'715'808 bytes =~ 116MB. However, applying SizeEstimator
to different objects leads to very different results. For instance, I try computing the size separately for each row in the dataframe and sum them:
df.map(row => SizeEstimator.estimate(row.asInstanceOf[ AnyRef ])).reduce(_+_)
这导致大小为 12'084'698'256 字节 =~ 12GB.或者,我可以尝试将 SizeEstimator
应用于每个分区:
This results in a size of 12'084'698'256 bytes =~ 12GB. Or, I can try to apply SizeEstimator
to every partition:
df.mapPartitions(
iterator => Seq(SizeEstimator.estimate(
iterator.toList.map(row => row.asInstanceOf[ AnyRef ]))).toIterator
).reduce(_+_)
这再次导致不同大小的 10'792'965'376 字节 =~ 10.8GB.
which results again in a different size of 10'792'965'376 bytes =~ 10.8GB.
我知道涉及内存优化/内存开销,但在执行这些测试后,我不知道如何使用 SizeEstimator
来获得足够好的数据帧大小估计(以及因此分区大小,或结果 Parquet 文件大小).
I understand there are memory optimizations / memory overhead involved, but after performing these tests I don't see how SizeEstimator
can be used to get a sufficiently good estimate of the dataframe size (and consequently of the partition size, or resulting Parquet file sizes).
应用 SizeEstimator
以获得对数据帧大小或其分区的良好估计的适当方法(如果有)是什么?如果没有,这里建议的方法是什么?
What is the appropriate way (if any) to apply SizeEstimator
in order to get a good estimate of a dataframe size or of its partitions? If there isn't any, what is the suggested approach here?
推荐答案
不幸的是,我无法从 SizeEstimator
获得可靠的估计,但我可以找到另一种策略 - 如果数据帧被缓存,我们可以从 queryExecution
中提取它的大小,如下所示:
Unfortunately, I was not able to get reliable estimates from SizeEstimator
, but I could find another strategy - if the dataframe is cached, we can extract its size from queryExecution
as follows:
df.cache.foreach(_ => ())
val catalyst_plan = df.queryExecution.logical
val df_size_in_bytes = spark.sessionState.executePlan(
catalyst_plan).optimizedPlan.stats.sizeInBytes
对于示例数据帧,这正好为 4.8GB(这也对应于写入未压缩 Parquet 表时的文件大小).
For the example dataframe, this gives exactly 4.8GB (which also corresponds to the file size when writing to an uncompressed Parquet table).
这样做的缺点是需要缓存数据帧,但在我的情况下这不是问题.
This has the disadvantage that the dataframe needs to be cached, but it is not a problem in my case.
将 df.cache.foreach(_=>_)
替换为 df.cache.foreach(_ => ())
,感谢@DavidBenedeki 在评论中指出.
Replaced df.cache.foreach(_=>_)
by df.cache.foreach(_ => ())
, thanks to @DavidBenedeki for pointing it out in the comments.
这篇关于计算 Spark 数据帧的大小 - SizeEstimator 给出了意想不到的结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!