阿帕奇星火 - HashPartitioner:它是如何工作的? [英] Apache Spark - HashPartitioner : How does it work?

查看:486
本文介绍了阿帕奇星火 - HashPartitioner:它是如何工作的?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的<一个文件上读了href=\"http://spark.apache.org/docs/1.3.1/api/java/index.html?org/apache/spark/HashPartitioner.html\">HashPartitioner.不幸的是没有什么,不同的是API调用解释。我认为HashPartitioner划分基于所述密钥的散列分布式集的假设下。例如,如果我的数据是像

I read up on the documentation of HashPartitioner. Unfortunately nothing much was explained except for the API calls. I am under the assumption that HashPartitioner partitions the distributed set based on the hash of the keys. For example if my data is like

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

所以Partioner将投入与坠落在同一个分区相同的密钥不同的分区这一点。但我不明白的构造函数参数的意义

So Partioner would put this into different partitions with same keys falling in the same partition. However I do not understand the significance of the constructor argument

new HashPartitoner(numPartitions) //What does numPartitions do?

有关上述数据将如​​何结果不同,如果我做了

For the above dataset how would the results differ if I did

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

那么,如何HashPartitioner工作实际上?

So how does HashPartitioner work actually?

推荐答案

好了,让让你的数据稍微更有趣的:

Well, lets make your dataset marginally more interesting:

val rdd = sc.parallelize(for {
    x <- 1 to 3
    y <- 1 to 2
} yield (x, None), 8)

我们有六大要素:

scala> rdd.count
res32: Long = 6

没有partirioner:

no partirioner:

scala> rdd.partitioner
res33: Option[org.apache.spark.Partitioner] = None

和8个分区:

scala> rdd.partitions.length
res35: Int = 8

现在可以定义小帮手来计算每个分区的元素数:

Now lets define small helper to count number of elements per partition:

def countByPartition(rdd: RDD[(Int, None.type)]) = {
    rdd.mapPartitions(iter => Iterator(iter.length))
}

由于我们没有分区我们的数据集分区之间均匀分布的:

Since we don't have partitioner our dataset is distributed uniformly between partitions:

scala> countByPartition(rdd).collect()
res43: Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

现在让我们重新分区数据集:

Now lets repartition our dataset:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

由于传递到 HashPartitioner 参数定义分区的数量我们期望一个分区:

Since parameter passed to HashPartitioner defines number of partitions we have expect one partition:

scala> rddOneP.partitions.length
res45: Int = 1

由于我们只有一个分区它包含的所有元素:

Since we have only one partition it contains all elements:

 scala> countByPartition(rddOneP).collect
 res48: Array[Int] = Array(6)

同样的方式,如果我们使用 HashPartitioner(2)

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

我们将得到2个分区:

scala> rddTwoP.partitions.length
res50: Int = 2

由于 RDD 由关键数据分区不会再均匀分布:

Since rdd is partitioned by key data won't be distributed uniformly anymore:

scala> countByPartition(rddTwoP).collect()
res51: Array[Int] = Array(2, 4)

由于与有三把钥匙,只有两个散code的不同值 MOD numPartitions 有什么意想不到的位置:

Because with have three keys and only two different values of hashCode mod numPartitions there is nothing unexpected here:

scala> (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
res55: scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

只是为了确认上述

Just to confirm above:

scala> rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
res58: Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

最后以 HashPartitioner(10)我们得到10个分区,每三个非空的有2个元素:

Finally with HashPartitioner(10) we get ten partitions, three non-empty with 2 elements each:

scala> val rddTenP = rdd.partitionBy(new HashPartitioner(10))
scala> rddTenP.partitions.length
res61: Int = 10

scala> countByPartition(rddTenP).collect()
res62: Array[Int] = Array(0, 2, 2, 2, 0, 0, 0, 0, 0, 0)

摘要


  • HashPartitioner 需要定义分区的数量的一个参数

  • 值分配给使用散code 键的分区

  • 如果按键的分布是不均匀的,你可以的情况下结束,当你集群的一部分处于闲置状态

  • 键必须是可哈希的。您可以检查我的答案列表为PySpark的reduceByKey 阅读有关PySpark具体问题的一个关键。另一个可能的问题是由 HashPartitioner突出文档

    Summary

    • HashPartitioner takes a single argument which defines number of partitions
    • values are assigned to partitions using hashCode of keys
    • if distribution of keys is not uniform you can end up in situations when part of your cluster is idle
    • keys have to be hashable. You can check my answer for A list as a key for PySpark's reduceByKey to read about PySpark specific issues. Another possible problem is highlighted by HashPartitioner documentation:

      Java数组具有基于阵列的身份,而不是它们的内容散列codeS,因此试图分区的RDD [数组[<青霉>]]或RDD [(阵列[的], _)使用HashPartitioner会产生意想不到的或不正确的结果。

      Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.

      在Python 3里,你必须确保哈希是一致的。请参见什么例外:?在pyspark字符串的哈希随机性应通过PYTHONHASHSEED被禁止的意思

    • In Python 3 you have to make sure that hashing is consistent. See What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?

      这篇关于阿帕奇星火 - HashPartitioner:它是如何工作的?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆