如何从迭代器创建Spark RDD? [英] How to create Spark RDD from an iterator?

查看:242
本文介绍了如何从迭代器创建Spark RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了清楚起见,我不是从数组/列表中寻找RDD,例如

To make it clear, I am not looking for RDD from an array/list like

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample
JavaRDD<Integer> rdd = new JavaSparkContext().parallelize(list);

如何在没有在内存中完全缓冲的情况下从Java迭代器创建spark RDD?

How can I create a spark RDD from a java iterator without completely buffering it in memory?

Iterator<Integer> iterator = Arrays.asList(1, 2, 3, 4).iterator(); //sample iterator for illustration
JavaRDD<Integer> rdd = new JavaSparkContext().what("?", iterator); //the Question

其他问题:

是否需要重新读取(或能够多次读取)源才能为RDD提供弹性?换句话说,由于迭代器基本上是一次读取的,因此甚至有可能从迭代器创建弹性分布式数据集(RDD)吗?

Is it a requirement for source to be re-readable(or capable to read many times) to offer resilience for RDD? In other words, since iterators are fundamentally read-once, is it even possible to create Resilient Distributed Datasets(RDD) from iterators?

推荐答案

正如其他人所说,您可以通过Spark Streaming做一些事情,但是就纯Spark而言,您做不到,原因是您在做什么问违背了火花的模型.让我解释. 为了分发和并行化工作,spark必须将其分成多个部分.从HDFS读取数据时,HDFS会对Spark进行分块"操作,因为HDFS文件是按块组织的. Spark通常会在每个块中生成一个任务. 现在,迭代器仅提供对数据的顺序访问,因此spark不可能在不读取内存中所有内容的情况下将它们组织成块.

As somebody else said, you could do something with spark streaming, but as for pure spark, you can't, and the reason is that what you're asking goes against spark's model. Let me explain. To distribute and parallelize work, spark has to divide it in chunks. When reading from HDFS, that 'chunking' is done for Spark by HDFS, since HDFS files are organized in blocks. Spark will generally generate one task per block. Now, iterators only provide sequential access to your data, so it's impossible for spark to organize it in chunks without reading it all in memory.

构建具有单个可迭代分区的RDD是可能的,但是即使那样,也无法确定是否可以将Iterable的实现发送给工作人员.使用sc.parallelize()时,spark将创建实现serializable的分区,以便可以将每个分区发送到不同的工作程序.可迭代可能是通过网络连接,也可能是本地FS中的文件,因此除非将它们缓冲在内存中,否则它们无法发送给工作线程.

It may be possible to build a RDD that has a single iterable partition, but even then, it is impossible to say if the implementation of the Iterable could be sent to workers. When using sc.parallelize() spark creates partitions that implement serializable so each partition can be sent to a different worker. The iterable could be over a network connection, or file in the local FS, so they cannot be sent to the workers unless they are buffered in memory.

这篇关于如何从迭代器创建Spark RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆