Spark:如何将RDD [T]`拆分为Seq [RDD [T]]并保留顺序 [英] Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering

查看:285
本文介绍了Spark:如何将RDD [T]`拆分为Seq [RDD [T]]并保留顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何有效地将RDD[T]拆分为具有n元素的Seq[RDD[T]]/Iterable[RDD[T]]并保留原始顺序?

How can I effectively split up an RDD[T] into a Seq[RDD[T]] / Iterable[RDD[T]] with n elements and preserve the original ordering?

我希望能够写这样的东西

I would like to be able to write something like this

RDD(1, 2, 3, 4, 5, 6, 7, 8, 9).split(3)

应该会导致类似的情况

Seq(RDD(1, 2, 3), RDD(4, 5, 6), RDD(7, 8, 9))

spark是否提供这种功能?如果不是,一种有效的方法可以实现这一目标?

Does spark provide such a function? If not what is a performant way to achieve this?

val parts = rdd.length / n
val rdds = rdd.zipWithIndex().map{ case (t, i) => (i - (i % parts), t)}.groupByKey().values.map(iter => sc.parallelize(iter.toSeq)).collect

看起来不是很快.

推荐答案

从技术上讲,您可以按照您的建议进行操作.但是,在利用计算集群执行大数据的分布式处理的情况下,这实际上没有任何意义.首先,它与Spark的要点背道而驰.如果您执行groupByKey,然后尝试将其提取到单独的RDD中,则可以有效地将RDD中分发的所有数据拉到驱动程序上,然后将每个数据重新分发回群集.如果驱动程序无法加载整个数据文件,则也将无法执行此操作.

You can, technically, do what you're suggesting. However, it really doesn't make sense in the context of leveraging a computing cluster to perform distributed processing of big data. It goes against the entire point of Spark in the first place. If you do a groupByKey and then try to extract these into separate RDDs, you are effectively pulling all of the data distributed in the RDD onto the driver and then re-distributing each one back to the cluster. If the driver can't load the entire data file, it won't be able to perform this operation either.

您不应将大型数据文件从本地文件系统加载到驱动程序节点上.您应该将文件移动到HDFS或S3之类的分布式文件系统上.然后,您可以通过val lines = SparkContext.textFile(...)将单个大数据文件加载到群集中,成为行的RDD.当您执行此操作时,集群中的每个工作进程将仅加载文件的一部分,因为数据已经在分布式文件系统中的整个集群中进行了分配,所以可以这样做.

You should not be loading large data files onto the driver node from a local file system. You should move your file onto a distributed filesystem like HDFS or S3. Then you can load your single big data file onto your cluster by means of val lines = SparkContext.textFile(...) into an RDD of lines. When you do this, each worker in the cluster will only load a portion of the file, which can be done because the data is already distributed across the cluster in the distributed filesystem.

如果您随后需要将数据组织到对数据的功能处理很重要的批次"中,则可以使用适当的批次标识符来键入数据,例如:val batches = lines.keyBy( line => lineBatchID(line) )

If you then need to organize the data into "batches" that are important to the functional processing of the data, you can key the data with an appropriate batch-identifier, like: val batches = lines.keyBy( line => lineBatchID(line) )

然后可以将每个批次简化为批次级别的摘要,并将这些摘要简化为单个总体结果.

Each batch can then be reduced to a batch-level summary, and these summaries can be reduced into a single overall result.

出于测试Spark代码的目的,可以将数据文件的 small 示例加载到单台计算机上.但是,对于完整的数据集,您应该利用一个分布式文件系统以及一个Spark集群来处理这些数据.

For the purposes of testing Spark code, it is fine to load a small sample of a data file onto a single machine. But when it comes to the full dataset, you should be leveraging a distributed filesystem in conjunction with a spark cluster to process this data.

这篇关于Spark:如何将RDD [T]`拆分为Seq [RDD [T]]并保留顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆