每个分区尽管重新分区只能压缩与相同数量的元素RDDS [英] Can only zip RDDs with same number of elements in each partition despite repartition

查看:992
本文介绍了每个分区尽管重新分区只能压缩与相同数量的元素RDDS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打开一个数据集

val data = sc.textFile("/home/kybe/Documents/datasets/img.csv",defp)

我想要把这个数据的索引从而

I want to put an index on this data thus

val nb = data.count.toInt
val tozip = sc.parallelize(1 to nb).repartition(data.getNumPartitions)

val res = tozip.zip(data)

不幸的是,我有以下错误

Unfortunately i have the following error

Can only zip RDDs with same number of elements in each partition

我如何能够通过分区修改元素的数量是否有可能?

How can i modify the number of element by partition if it is possible ?

推荐答案

的<一个文档href=\"https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#zip(org.apache.spark.rdd.RDD,%20scala.reflect.ClassTag)\"相对=nofollow>压缩()规定:

拉​​链本RDD用另一个,返回键 - 值对在每个RDD,第一元件,在每个RDD第二元件等假定这两个RDDS具有的相同的分区数的和在相同数量的每个分区的元素的(例如,一个通过在另一个地图制造)。

Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc. Assumes that the two RDDs have the same number of partitions and the same number of elements in each partition (e.g. one was made through a map on the other).

因此​​,我们需要确保我们满足2个条件:

So we need to make sure we meet 2 conditions:


  • 两者RDDS具有相同数量的分区

  • 在这些RDDS各自分区具有完全相同的尺寸

您是确保你将有相同数量的分区与重新分区()星火,但并不能保证你会在每个分区相同的分布每个RDD。

You are making sure that you will have the same number of partitions with repartition() but Spark doesn't guarantee that you will have the same distribution in each partition for each RDD.

由于有不同类型RDDS和大多有不同的分区策略!例如:

Because there are different types of RDDs and most of them have different partitioning strategies! For example:


  • 当你用parallelise sc.parallelize(集合)集合创建ParallelCollectionRDD 它会看到多少个分区应该有,会检查集合的大小和计算在尺寸。即你必须在列表中的15个元素,并希望4个分区,第3将有4个连续的元素最后一个将拥有其余3。

  • HadoopRDD如果我没记错,每个文件块的一个分区。即使你使用的是本地文件内部星火首先当你读本地文件,然后映射了RDD因为这RDD是&LT一对RDD创建这种RDD的;长,文本和GT; 只是想和你字符串: - )

  • etc.etc。

  • ParallelCollectionRDD is created when you parallelise a collection with sc.parallelize(collection) it will see how many partitions there should be, will check the size of the collection and calculate the step size. I.e. you have 15 elements in the list and want 4 partitions, first 3 will have 4 consecutive elements last one will have the remaining 3.
  • HadoopRDD if I remember correctly, one partition per file block. Even though you are using a local file internally Spark first creates a this kind of RDD when you read a local file and then maps that RDD since that RDD is a pair RDD of <Long, Text> and you just want String :-)
  • etc.etc.

在你的例子星火内部并创建不同类型的RDDS( CoalescedRDD ShuffledRDD )的,而这样做的重新分配,但我想你明白我的不同RDDS有不同的分区策略全球想法: - )

In your example Spark internally does create different types of RDDs (CoalescedRDD and ShuffledRDD) while doing the repartitioning but I think you got the global idea that different RDDs have different partitioning strategies :-)

注意:拉链的最后一部分()文档提到了地图()操作。此操作不重新分区,因为它是一个转换数据,以便保证这两个条件。

Notice that the last part of the zip() doc mentions the map() operation. This operation does not repartition as it's a narrow transformation data so it would guarantee both conditions.

在因为有人提到你可以简单地做这个简单的例子 data.zipWithIndex 。如果你需要的东西比创建新RDD拉链更复杂的()创建地图()如前所述以上。

In this simple example as it was mentioned you can do simply data.zipWithIndex. If you need something more complicated than creating the new RDD for zip() should be created with map() as mentioned above.

这篇关于每个分区尽管重新分区只能压缩与相同数量的元素RDDS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆