HashPartitioner 是如何工作的? [英] How does HashPartitioner work?

查看:29
本文介绍了HashPartitioner 是如何工作的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读了

现在让我们重新分区我们的数据集:

import org.apache.spark.HashPartitionerval rddOneP = rdd.partitionBy(new HashPartitioner(1))

由于传递给 HashPartitioner 的参数定义了我们期望一个分区的分区数:

rddOneP.partitions.length

Int = 1

由于我们只有一个分区,所以它包含所有元素:

countByPartition(rddOneP).collect

Array[Int] = Array(6)

请注意,shuffle 后值的顺序是不确定的.

同样的方法,如果我们使用 HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

我们将得到 2 个分区:

rddTwoP.partitions.length

Int = 2

由于rdd是按key划分的,数据不再均匀分布:

countByPartition(rddTwoP).collect()

Array[Int] = Array(2, 4)

因为只有三个键和两个不同的值 hashCode mod numPartitions 这里没有什么意外:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))

scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

只是为了确认以上:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()

Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

最后使用 HashPartitioner(7) 我们得到七个分区,三个非空,每个分区 2 个元素:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))rddSevenP.partitions.length

Int = 7

countByPartition(rddTenP).collect()

Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

总结和注释

  • HashPartitioner 接受一个定义分区数量的参数
  • 使用键的 hash 将值分配给分区.hash 函数可能因语言而异(Scala RDD 可能使用 hashCodeDataSets 使用 MurmurHash 3,PySpark,portable_hash).

    在这样的简单情况下,key 是一个小整数,你可以假设 hash 是一个身份 (i = hash(i)).

    Scala API 使用 nonNegativeMod 根据计算的哈希值确定分区,

  • 如果密钥分配不统一,您可能会遇到集群的一部分空闲

  • 键必须是可散列的.您可以查看我对 A list as a key for PySpark's reduceByKey 的回答,以了解 PySpark 特定问题.HashPartitioner 强调了另一个可能的问题文档:

    <块引用>

    Java 数组具有基于数组身份而非内容的哈希码,因此尝试使用以下方法对 RDD[Array[]] 或 RDD[(Array[], _)] 进行分区HashPartitioner 会产生意外或不正确的结果.

  • 在 Python 3 中,您必须确保散列是一致的.请参阅什么是例外:应该通过 PYTHONHASHSEED 在 pyspark 中禁用字符串散列的随机性?

  • 哈希分区器既不是单射也不是满射.可以将多个键分配给单个分区,某些分区可以保持为空.

  • 请注意,当前基于哈希的方法在与 REPL 定义的案例类结合时在 Scala 中不起作用(案例类相等在 Apache Spark 中).

  • HashPartitioner(或任何其他Partitioner)打乱数据.除非在多个操作之间重复使用分区,否则它不会减少要混洗的数据量.

I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner partitions the distributed set based on the hash of the keys. For example if my data is like

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

So partitioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument

new HashPartitoner(numPartitions) //What does numPartitions do?

For the above dataset how would the results differ if I did

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

So how does HashPartitioner work actually?

解决方案

Well, lets make your dataset marginally more interesting:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

We have six elements:

rdd.count

Long = 6

no partitioner:

rdd.partitioner

Option[org.apache.spark.Partitioner] = None

and eight partitions:

rdd.partitions.length

Int = 8

Now lets define small helper to count number of elements per partition:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

Since we don't have partitioner our dataset is distributed uniformly between partitions (Default Partitioning Scheme in Spark):

countByPartition(rdd).collect()

Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

Now lets repartition our dataset:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:

rddOneP.partitions.length

Int = 1

Since we have only one partition it contains all elements:

countByPartition(rddOneP).collect

Array[Int] = Array(6)

Note that the order of values after the shuffle is non-deterministic.

Same way if we use HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

we'll get 2 partitions:

rddTwoP.partitions.length

Int = 2

Since rdd is partitioned by key data won't be distributed uniformly anymore:

countByPartition(rddTwoP).collect()

Array[Int] = Array(2, 4)

Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))

scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

Just to confirm the above:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()

Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

Finally with HashPartitioner(7) we get seven partitions, three non-empty with 2 elements each:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length

Int = 7

countByPartition(rddTenP).collect()

Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

Summary and Notes

  • HashPartitioner takes a single argument which defines number of partitions
  • values are assigned to partitions using hash of keys. hash function may differ depending on the language (Scala RDD may use hashCode, DataSets use MurmurHash 3, PySpark, portable_hash).

    In simple case like this, where key is a small integer, you can assume that hash is an identity (i = hash(i)).

    Scala API uses nonNegativeMod to determine partition based on computed hash,

  • if distribution of keys is not uniform you can end up in situations when part of your cluster is idle

  • keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:

    Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.

  • In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?

  • Hash partitioner is neither injective nor surjective. Multiple keys can be assigned to a single partition and some partitions can remain empty.

  • Please note that currently hash based methods don't work in Scala when combined with REPL defined case classes (Case class equality in Apache Spark).

  • HashPartitioner (or any other Partitioner) shuffles the data. Unless partitioning is reused between multiple operations it doesn't reduce amount of data to be shuffled.

这篇关于HashPartitioner 是如何工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆