为什么在重新分区 Spark 数据帧时会得到这么多空分区? [英] Why do I get so many empty partitions when repartionning a Spark Dataframe?
问题描述
我想在 3 列上对数据框df1"进行分区.该数据框对于这 3 列恰好有 990 个独特的组合:
I want to partition a dataframe "df1" on 3 columns. This dataframe has exactly 990 unique combinaisons for those 3 columns:
In [17]: df1.createOrReplaceTempView("df1_view")
In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+
|count(1)|
+--------+
| 990|
+--------+
为了优化这个数据帧的处理,我想对 df1 进行分区以获得 990 个分区,每个关键可能性一个:
In order to optimize the processing of this dataframe, I want to partition df1 in order to get 990 partitions, one for each key possibility:
In [19]: df1.rdd.getNumPartitions()
Out[19]: 24
In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")
In [21]: df2.rdd.getNumPartitions()
Out[21]: 990
我写了一个简单的方法来计算每个分区中的行数:
I wrote a simple way to count rows in each partition:
In [22]: def f(iterator):
...: a = 0
...: for partition in iterator:
...: a = a + 1
...: print(a)
...:
In [23]: df2.foreachPartition(f)
我注意到实际上我得到的是 628 个带有一个或多个键值的分区,以及 362 个空分区.
And I notice that what I get in fact is 628 partitions with one or more key values, and 362 empty partitions.
我认为 spark 会以均匀的方式重新分区(1 个键值 = 1 个分区),但这似乎不是这样,而且我觉得这种重新分区会增加数据倾斜,即使它应该是相反的...
I assumed spark would repartition in an even way (1 key value = 1 partition) but that does not seem like it, and I feel like this repartitionning is adding data skew even though it should be the other way around...
Spark 在列上对数据帧进行分区的算法是什么?有没有办法实现我认为可能的目标?
What's the algorithm Spark uses to partition a dataframe on columns ? Is there a way to achieve what I thought was possible ?
我在 Cloudera 上使用 Spark 2.2.0.
I'm using Spark 2.2.0 on Cloudera.
推荐答案
为了跨分区分发数据,spark 需要以某种方式将列的值转换为分区的索引.Spark 中有两个默认分区器 - HashPartitioner 和 RangePartitioner.Spark 中的不同转换可以应用不同的分区器 - 例如join
将应用哈希分区器.
To distribute data across partitions spark needs somehow to convert value of the column to index of the partition. There are two default partitioners in Spark - HashPartitioner and RangePartitioner. Different transformations in Spark can apply different partitioners - e.g. join
will apply hash partitioner.
对于将值转换为分区索引的哈希分区器公式,基本上是 value.hashCode() % numOfPartitions
.在您的情况下,多个值映射到同一个分区索引.
Basically for hash partitioner formula to convert value to partition index would be value.hashCode() % numOfPartitions
. In your case multiple values are mapping to same partition index.
如果你想要更好的分发,你可以实现你自己的分区器.更多关于它的是这里和这里 和 此处.
You could implement your own partitioner if you want better distribution. More about it is here and here and here.
这篇关于为什么在重新分区 Spark 数据帧时会得到这么多空分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!