为什么在对Spark Dataframe进行分区时会得到这么多的空分区? [英] Why do I get so many empty partitions when repartionning a Spark Dataframe?

查看:307
本文介绍了为什么在对Spark Dataframe进行分区时会得到这么多的空分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将数据帧"df1"划分为3列.该数据框具有针对这3列的990个唯一组合:

I want to partition a dataframe "df1" on 3 columns. This dataframe has exactly 990 unique combinaisons for those 3 columns:

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

为了优化此数据帧的处理,我想对df1进行分区,以获取990个分区,每种可能的分区之一:

In order to optimize the processing of this dataframe, I want to partition df1 in order to get 990 partitions, one for each key possibility:

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

我写了一种简单的方法来计算每个分区中的行:

I wrote a simple way to count rows in each partition:

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

我注意到实际上我得到的是具有一个或多个键值的628个分区,以及362个空分区.

And I notice that what I get in fact is 628 partitions with one or more key values, and 362 empty partitions.

我认为spark将以均匀的方式重新分区(1个键值= 1个分区),但这似乎不是这样,而且我觉得这种重新分区正在添加数据偏斜,即使它应该是相反的方式也是如此...

I assumed spark would repartition in an even way (1 key value = 1 partition) but that does not seem like it, and I feel like this repartitionning is adding data skew even though it should be the other way around...

Spark在列上划分数据帧的算法是什么? 有没有办法实现我认为的可能?

What's the algorithm Spark uses to partition a dataframe on columns ? Is there a way to achieve what I thought was possible ?

我正在Cloudera上使用Spark 2.2.0.

I'm using Spark 2.2.0 on Cloudera.

推荐答案

要在分区之间分配数据,spark需要以某种方式将列的值转换为分区的索引. Spark中有两个默认分区程序-HashPartitioner和RangePartitioner. Spark中的不同转换可以应用不同的分区程序-例如join将应用哈希分区程序.

To distribute data across partitions spark needs somehow to convert value of the column to index of the partition. There are two default partitioners in Spark - HashPartitioner and RangePartitioner. Different transformations in Spark can apply different partitioners - e.g. join will apply hash partitioner.

基本上,将散列分区程序公式转换为分区索引的值是value.hashCode() % numOfPartitions.在您的情况下,多个值映射到相同的分区索引.

Basically for hash partitioner formula to convert value to partition index would be value.hashCode() % numOfPartitions. In your case multiple values are mapping to same partition index.

如果想要更好的分发,则可以实现自己的分区程序.关于它的更多信息是此处

You could implement your own partitioner if you want better distribution. More about it is here and here and here.

这篇关于为什么在对Spark Dataframe进行分区时会得到这么多的空分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆