为什么在重新分区 Spark 数据帧时会得到这么多空分区? [英] Why do I get so many empty partitions when repartionning a Spark Dataframe?

查看:30
本文介绍了为什么在重新分区 Spark 数据帧时会得到这么多空分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在 3 列上对数据框df1"进行分区.该数据框对于这 3 列恰好有 990 个独特的组合:

I want to partition a dataframe "df1" on 3 columns. This dataframe has exactly 990 unique combinaisons for those 3 columns:

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

为了优化这个数据帧的处理,我想对 df1 进行分区以获得 990 个分区,每个关键可能性一个:

In order to optimize the processing of this dataframe, I want to partition df1 in order to get 990 partitions, one for each key possibility:

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

我写了一个简单的方法来计算每个分区中的行数:

I wrote a simple way to count rows in each partition:

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

我注意到实际上我得到的是 628 个带有一个或多个键值的分区,以及 362 个空分区.

And I notice that what I get in fact is 628 partitions with one or more key values, and 362 empty partitions.

我认为 spark 会以均匀的方式重新分区(1 个键值 = 1 个分区),但这似乎不是这样,而且我觉得这种重新分区会增加数据倾斜,即使它应该是相反的...

I assumed spark would repartition in an even way (1 key value = 1 partition) but that does not seem like it, and I feel like this repartitionning is adding data skew even though it should be the other way around...

Spark 在列上对数据帧进行分区的算法是什么?有没有办法实现我认为可能的目标?

What's the algorithm Spark uses to partition a dataframe on columns ? Is there a way to achieve what I thought was possible ?

我在 Cloudera 上使用 Spark 2.2.0.

I'm using Spark 2.2.0 on Cloudera.

推荐答案

为了跨分区分发数据,spark 需要以某种方式将列的值转换为分区的索引.Spark 中有两个默认分区器 - HashPartitioner 和 RangePartitioner.Spark 中的不同转换可以应用不同的分区器 - 例如join 将应用哈希分区器.

To distribute data across partitions spark needs somehow to convert value of the column to index of the partition. There are two default partitioners in Spark - HashPartitioner and RangePartitioner. Different transformations in Spark can apply different partitioners - e.g. join will apply hash partitioner.

对于将值转换为分区索引的哈希分区器公式,基本上是 value.hashCode() % numOfPartitions.在您的情况下,多个值映射到同一个分区索引.

Basically for hash partitioner formula to convert value to partition index would be value.hashCode() % numOfPartitions. In your case multiple values are mapping to same partition index.

如果你想要更好的分发,你可以实现你自己的分区器.更多关于它的是这里这里此处.

You could implement your own partitioner if you want better distribution. More about it is here and here and here.

这篇关于为什么在重新分区 Spark 数据帧时会得到这么多空分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆