在Spark中使用reduceByKey时,是否有一种有效的分区方法? [英] Is there an effective partitioning method when using reduceByKey in Spark?
问题描述
当我使用 reduceByKey
或 aggregateByKey
时,我遇到了分区问题.
When I use reduceByKey
or aggregateByKey
, I'm confronted with partition problems.
ex) reduceBykey(_ + _).map(code)
尤其是,如果输入数据倾斜,则使用上述方法时,分区问题将变得更加严重.
Especially, if input data is skewed, the partitioning problem becomes even worse when using the above methods.
因此,作为解决方案,我使用 repartition
方法.
So, as a solution to this, I use repartition
method.
例如, http://dev.sortable.com/spark-repartition/相似.
这对分区分配很有用,但是重新划分
也是昂贵的.
This is good for partition distribution, but therepartition
is also expensive.
有办法明智地解决分区问题吗?
Is there a way to solve the partition problem wisely?
推荐答案
您必须区分两个不同的问题:
You have to distinguish between two different problems:
数据偏斜
如果数据分配存在严重偏差(假设最坏的情况下只有一个唯一键),那么根据定义,输出将出现偏差,更改分区程序将无济于事.
If data distribution is highly skewed (let's assume the worst case scenario with only a single unique key) then by definition the output will be skewed and changing a partitioner cannot help you.
有些技术可以用来部分解决该问题,但是总体分区并不是这里的核心问题.
There are some techniques, which can be used to partially address the problem, but overall partitioning is not a core issue here.
分区偏差
选择不正确的分区功能可能导致数据分布偏斜,即使数据是均匀分布的也是如此.例如:
Poorly chosen partitioning function can result in a skewed data distribution even if data is uniformly distributed. For example:
val rdd = sc.parallelize(Seq((5, None), (10, None), (15, None), (20, None)), 5)
rdd
.partitionBy(new org.apache.spark.HashPartitioner(5))
.glom.map(_.size).collect
Array[Int] = Array(4, 0, 0, 0, 0)
如您所见,尽管密钥分配没有偏斜,但由于 hashCode
的数据规律性和较差的属性而导致偏斜.
As you can see despite the fact that key distribution is not skewed, skewed has been induced by the data regularities and poor properties of the hashCode
.
在这种情况下,请选择其他 Partitioner
:
In case like this choosing different Partitioner
:
rdd
.partitionBy(new org.apache.spark.RangePartitioner(5, rdd))
.glom.map(_.size).collect
Array[Int] = Array(1, 1, 1, 1, 0)
或调整现有的属性:
rdd
.partitionBy(new org.apache.spark.HashPartitioner(7))
.glom.map(_.size).collect
Array[Int] = Array(0, 1, 0, 1, 0, 1, 1)
可以解决问题.
这篇关于在Spark中使用reduceByKey时,是否有一种有效的分区方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!