如何分区RDD [英] How to partition a RDD

查看:164
本文介绍了如何分区RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由大量由空格分隔的随机浮动值组成的文本文件。
我将这个文件加载到scala中的RDD中。
RDD如何分区?还有,是否有任何方法来生成自定义分区,以便所有分区都有相同数量的元素以及索引每个分区?

  val dRDD = sc.textFile(hdfs:// master:54310 / Data / input *)
keyval = dRDD.map(x => process(x.trim().span('').map(_.toDouble),query_norm,m,r))

这里我从HDFS加载多个文本文件,并且进程是我打电话的函数。
我可以使用mapPartitonsWithIndex的解决方案以及如何访问流程函数中的索引? Map混洗分区。

解决方案


RDD如何分区?


默认情况下,会为每个HDFS分区创建一个分区,默认情况下为64MB。阅读更多此处


如何在分区间平衡我的数据?


首先,看看三种方式一个可以重新分配他的数据:

1)传递第二个参数,所需的最小数量的分区数量
为您的RDD,放入 textFile(),但要小心:

  In [14]:lines = sc.textFile( data)

在[15]中:lines.getNumPartitions()
Out [15]:1000

In [16]:lines = sc.textFile (data,500)

在[17]中:lines.getNumPartitions()
Out [17]:1434

In [18]:lines = sc.textFile(data,5000)

在[19]中:lines.getNumPartitions()
Out [19]:5926

正如你所看到的,由于RDD所具有的分区数量已经大于分区的最小数量,所以 [16] 没有达到预期的效果请求。



2)使用 repartition(),如下所示:

  [22]:lines = lines.repartition(10)

In [23]:lines.getNumPartitions()
Out [23]:10

警告:这将调用一个shuffle,并且应该在您希望增加分区的数量时使用RDD 。



$ b


shuffle是Spark重新分配数据的机制,因此它的分组方式不同跨分区。这通常涉及在执行者和机器之间复制数据,使得洗牌成为一项复杂而昂贵的操作。

3)使用 coalesce(),就像这:

  In [25]:lines = lines.coalesce(2)

In [26 ]:lines.getNumPartitions()
Out [26]:2

你会缩小RDD并获得优势。阅读有关重新分区()vs coalesce()的更多信息。






但是,所有这些保证您的数据将在您的分区间完美平衡吗?不是真的,因为我在如何在分区间平衡我的数据?


I have a text file consisting of a large number of random floating values separated by spaces. I am loading this file into a RDD in scala. How does this RDD get partitioned?

Also, is there any method to generate custom partitions such that all partitions have equal number of elements along with an index for each partition?

val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))

Here I am loading multiple text files from HDFS and process is a function I am calling. Can I have a solution with mapPartitonsWithIndex along with how can I access that index inside the process function? Map shuffles the partitions.

解决方案

How does an RDD gets partitioned?

By default a partition is created for each HDFS partition, which by default is 64MB. Read more here.

How to balance my data across partitions?

First, take a look at the three ways one can repartition his data:

1) Pass a second parameter, the desired minimum number of partitions for your RDD, into textFile(), but be careful:

In [14]: lines = sc.textFile("data")

In [15]: lines.getNumPartitions()
Out[15]: 1000

In [16]: lines = sc.textFile("data", 500)

In [17]: lines.getNumPartitions()
Out[17]: 1434

In [18]: lines = sc.textFile("data", 5000)

In [19]: lines.getNumPartitions()
Out[19]: 5926

As you can see, [16] doesn't do what one would expect, since the number of partitions the RDD has, is already greater than the minimum number of partitions we request.

2) Use repartition(), like this:

In [22]: lines = lines.repartition(10)

In [23]: lines.getNumPartitions()
Out[23]: 10

Warning: This will invoke a shuffle and should be used when you want to increase the number of partitions your RDD has.

From the docs:

The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

3) Use coalesce(), like this:

In [25]: lines = lines.coalesce(2)

In [26]: lines.getNumPartitions()
Out[26]: 2

Here, Spark knows that you will shrink the RDD and gets advantage of it. Read more about repartition() vs coalesce().


But will all this guarantee that your data will be perfectly balanced across your partitions? Not really, as I experienced in How to balance my data across the partitions?

这篇关于如何分区RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆