如何在分区间平衡我的数据? [英] How to balance my data across the partitions?

查看:261
本文介绍了如何在分区间平衡我的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

编辑:答案有帮助,但我在下面描述了我的解决方案: memoryOverhead问题在Spark中。






我有一个202092分区的RDD,由他人创建的数据集。我可以手动看到跨分区的数据是不平衡的,例如其中一些有0图像,其他有4k,而平均值是432.处理数据时,我得到了这个错误:

  YARN因超出内存限制而死亡的容器。使用16.9 GB的16 GB物理内存。考虑提升spark.yarn.executor.memoryOverhead。 

而memoryOverhead已经被提升。我觉得有些尖刺正在发生,这使得Yarn会杀死我的容器,因为那个尖刺溢出了指定的边界。



我应该怎么做我的数据 (粗略地) 跨分区平衡吗?


$ b

我的想法是,

  dataset = dataset.repartition(rel =nofollow> repartition()会起作用, 202092)

但我得到了相同的错误,尽管编程指南的指示:


重新分区(numPartitions)

随机重新排列RDD中的数据以创建更多或更少的
分区和 bal在他们之间 。这通常会在网络上洗刷所有数据






检查我的玩具例如:

  data = sc.parallelize([0,1,2],3).mapPartitions(lambda x:range ((x.next()+ 1)* 1000))
d = data.glom()。collect()
len(d [0])#1000
len(d [1 ])#2000
len(d [2])#3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom()。collect()
len re_d [1])#1754
len(re_d [2])#2392
repartitioned_data = data.repartition(6)
re_d = reartitioned_data.glom()。collect()
len(re_d [0])#422
len(re_d [1])#845
len(re_d [2])#1643
len(re_d [3])#1332
len(re_d [4])#1547
len(re_d [5])#211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom()。collect()
len(re_d [0])#132
len(re_d [1])#265
len(re_d [2 ])#530
len(re_d [3])#1060
len(re_d [4])#102 5
len(re_d [5])#145
len(re_d [6])#290
len(re_d [7])#580
len(re_d [8] )#1113
len(re_d [9])#272
len(re_d [10])#522
len(re_d [11])#66


解决方案

内存开销限制超出问题我认为是由于在提取期间使用DirectMemory缓冲区。我认为它在2.0.0中是固定的。 (我们遇到了同样的问题,但是当我们发现升级到2.0.0版本时,我们停止了更深入的挖掘。不幸的是,我没有Spark问题数字来支持我。)






repartition 之后的参差不齐的分区令人惊讶。与 https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443 。 Spark甚至会在 repartition 中生成随机密钥,所以它不会使用可能存在偏差的散列。



I尝试了您的示例,并使用Spark 1.6.2和Spark 2.0.0获得与 完全相同的结果。但不是来自Scala spark-shell

  scala> val data = sc.parallelize(1 to 3,3).mapPartitions {it => (1 to it.next * 1000).iterator} 
data:org.apache.spark.rdd.RDD [Int] = MapPartitionsRDD [6]位于< console>的mapPartitions:24

scala> data.mapPartitions {it => Sete [Int] = WrappedArray(1000,2000,3000)

scala> data.repartition(3).mapPartitions {it => Iterator(it.toSeq.size)} .collect.toSeq
res2:Seq [Int] = WrappedArray(1999,2001,2000)

scala> data.repartition(6).mapPartitions {it => Iterator(it.toSeq.size)} .collect.toSeq
res3:Seq [Int] = WrappedArray(999,1000,1000,1000,1001,1000)

scala> data.repartition(12).mapPartitions {it => Iterator(it.toSeq.size)} .collect.toSeq
res4:Seq [Int] = WrappedArray(500,501,501,501,501,500,499,499,499,499,500,500)

美丽的分区!




(对不起,这不是一个完整的答案,我只是想分享我的发现。)


Edit: The answer helps, but I described my solution in: memoryOverhead issue in Spark.


I have an RDD with 202092 partitions, which reads a dataset created by others. I can manually see that the data is not balanced across the partitions, for example some of them have 0 images and other have 4k, while the mean lies at 432. When processing the data, I got this error:

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

while memoryOverhead is already boosted. I feel that some spikes are happening which make Yarn kill my container, because that spike overflows the specified borders.

So what should I do make sure that my data are (roughly) balanced across partitions?


My idea was that repartition() would work, it invokes shuffling:

dataset = dataset.repartition(202092)

but I just got the very same error, despite the programming-guide's instructions:

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.


Check my toy example though:

data = sc.parallelize([0,1,2], 3).mapPartitions(lambda x: range((x.next() + 1) * 1000))
d = data.glom().collect()
len(d[0])     # 1000
len(d[1])     # 2000
len(d[2])     # 3000
repartitioned_data = data.repartition(3)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 1854
len(re_d[1])  # 1754
len(re_d[2])  # 2392
repartitioned_data = data.repartition(6)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 422
len(re_d[1])  # 845
len(re_d[2])  # 1643
len(re_d[3])  # 1332
len(re_d[4])  # 1547
len(re_d[5])  # 211
repartitioned_data = data.repartition(12)
re_d = repartitioned_data.glom().collect()
len(re_d[0])  # 132
len(re_d[1])  # 265
len(re_d[2])  # 530
len(re_d[3])  # 1060
len(re_d[4])  # 1025
len(re_d[5])  # 145
len(re_d[6])  # 290
len(re_d[7])  # 580
len(re_d[8])  # 1113
len(re_d[9])  # 272
len(re_d[10]) # 522
len(re_d[11]) # 66

解决方案

The memory overhead limit exceeding issue I think is due to DirectMemory buffers used during fetch. I think it's fixed in 2.0.0. (We had the same issue, but stopped digging much deeper when we found that upgrading to 2.0.0 resolved it. Unfortunately I don't have Spark issue numbers to back me up.)


The uneven partitions after repartition are surprising. Contrast with https://github.com/apache/spark/blob/v2.0.0/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L443. Spark even generates random keys in repartition, so it is not done with a hash that could be biased.

I tried your example and get the exact same results with Spark 1.6.2 and Spark 2.0.0. But not from Scala spark-shell:

scala> val data = sc.parallelize(1 to 3, 3).mapPartitions { it => (1 to it.next * 1000).iterator }
data: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:24

scala> data.mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res1: Seq[Int] = WrappedArray(1000, 2000, 3000)

scala> data.repartition(3).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res2: Seq[Int] = WrappedArray(1999, 2001, 2000)

scala> data.repartition(6).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res3: Seq[Int] = WrappedArray(999, 1000, 1000, 1000, 1001, 1000)

scala> data.repartition(12).mapPartitions { it => Iterator(it.toSeq.size) }.collect.toSeq
res4: Seq[Int] = WrappedArray(500, 501, 501, 501, 501, 500, 499, 499, 499, 499, 500, 500)

Such beautiful partitions!


(Sorry this is not a full answer. I just wanted to share my findings so far.)

这篇关于如何在分区间平衡我的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆