在Spark中使用reduceByKey时,是否有一种有效的分区方法? [英] Is there an effective partitioning method when using reduceByKey in Spark?

查看:84
本文介绍了在Spark中使用reduceByKey时,是否有一种有效的分区方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我使用 reduceByKey aggregateByKey 时,我遇到了分区问题.

When I use reduceByKey or aggregateByKey, I'm confronted with partition problems.

ex) reduceBykey(_ + _).map(code)

尤其是,如果输入数据倾斜,则使用上述方法时,分区问题将变得更加严重.

Especially, if input data is skewed, the partitioning problem becomes even worse when using the above methods.

因此,作为解决方案,我使用 repartition 方法.

So, as a solution to this, I use repartition method.

例如, http://dev.sortable.com/spark-repartition/相似.

这对分区分配很有用,但是重新划分也是昂贵的.

This is good for partition distribution, but therepartition is also expensive.

有办法明智地解决分区问题吗?

Is there a way to solve the partition problem wisely?

推荐答案

您必须区分两个不同的问题:

You have to distinguish between two different problems:

数据偏斜

如果数据分配存在严重偏差(假设最坏的情况下只有一个唯一键),那么根据定义,输出将出现偏差,更改分区程序将无济于事.

If data distribution is highly skewed (let's assume the worst case scenario with only a single unique key) then by definition the output will be skewed and changing a partitioner cannot help you.

有些技术可以用来部分解决该问题,但是总体分区并不是这里的核心问题.

There are some techniques, which can be used to partially address the problem, but overall partitioning is not a core issue here.

分区偏差

选择不正确的分区功能可能导致数据分布偏斜,即使数据是均匀分布的也是如此.例如:

Poorly chosen partitioning function can result in a skewed data distribution even if data is uniformly distributed. For example:

val rdd = sc.parallelize(Seq((5, None), (10, None), (15, None), (20, None)), 5)
rdd
  .partitionBy(new org.apache.spark.HashPartitioner(5))
  .glom.map(_.size).collect

Array[Int] = Array(4, 0, 0, 0, 0)

如您所见,尽管密钥分配没有偏斜,但由于 hashCode 的数据规律性和较差的属性而导致偏斜.

As you can see despite the fact that key distribution is not skewed, skewed has been induced by the data regularities and poor properties of the hashCode.

在这种情况下,请选择其他 Partitioner :

In case like this choosing different Partitioner:

rdd
  .partitionBy(new org.apache.spark.RangePartitioner(5, rdd))
  .glom.map(_.size).collect

Array[Int] = Array(1, 1, 1, 1, 0)

或调整现有的属性:

rdd
  .partitionBy(new org.apache.spark.HashPartitioner(7))
  .glom.map(_.size).collect

Array[Int] = Array(0, 1, 0, 1, 0, 1, 1)

可以解决问题.

这篇关于在Spark中使用reduceByKey时,是否有一种有效的分区方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆