Apache Spark:在Java中有效使用mapPartitions [英] Apache Spark: Effectively using mapPartitions in Java
问题描述
Spark的开发人员在当前早期发行的名为 High Performance Spark 的教科书中指出:
In the currently early-release textbook titled High Performance Spark, the developers of Spark note that:
为了使Spark能够灵活地溢出一些记录 到磁盘上,重要的是要在
mapPartitions
内以这样的方式表示您的功能 您的函数不会强制在内存中加载整个分区的方式(例如 隐式转换为列表).迭代器有很多方法可以编写功能样式 转换,或者您可以构造自己的自定义迭代器.当一个 转换直接采用并返回一个迭代器,而无需强制执行 另一个集合,我们称为迭代器到迭代器的转换.
To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of
mapPartitions
in such a way that your functions don’t force loading the entire partition in-memory (e.g. implicitly converting to a list). Iterators have many methods we can write functional style transformations on, or you can construct your own custom iterator. When a transformation directly takes and returns an iterator without forcing it through another collection, we call these iterator-to-iterator transformations.
但是,教科书缺少使用mapPartitions
或该方法的类似变体的良好示例.网上几乎没有很好的代码示例,其中大多数是Scala.例如,我们在
However, the textbook lacks good examples using mapPartitions
or similar variations of the method. And there's few good code examples existing online--most of which are Scala. For example, we see this Scala code using mapPartitions
written by zero323 on How to add columns into org.apache.spark.sql.Row inside of mapPartitions.
def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
不幸的是,Java没有为迭代器提供任何与iter.map(...)
一样好的功能.因此,这就引出了一个问题,即如何能在不将RDD
作为列表完全溢出到磁盘的情况下,将mapPartitions
有效地用于从迭代器到迭代器的转换?
Unfortunately, Java doesn't provide anything as nice as iter.map(...)
for iterators. So it begs the question, how can one effectively use the iterator-to-iterator transformations with mapPartitions
without entirely spilling an RDD
to disk as a list?
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
ArrayList<OutObj> out = new ArrayList<>();
while(iter.hasNext()) {
InObj current = iter.next();
out.add(someChange(current));
}
return out.iterator();
});
这似乎是在Java示例中使用mapPartitions
的通用语法,但是我不认为这是最有效的,假设您有一个具有成千上万条记录(甚至更多)的JavaRDD
...因为Spark适用于大数据).您最终将得到迭代器中所有对象的列表,只是将其转换为迭代器(这就意味着某种映射函数在此效率会更高).
This seems to be the general syntax for using mapPartitions
in Java examples, but I don't see how this would be the most efficient, supposing you have a JavaRDD
with tens of thousands of records (or even more...since, Spark is for big data). You'd eventually end up with a list of all the objects in the iterator, just to turn it back into an iterator (which begs to say that a map function of some sort would be much more efficient here).
注意:尽管使用mapPartitions
的这8行代码可以用map
或flatMap
编写为1行,但我故意使用mapPartitions
来利用它在每个分区而不是RDD
中的每个元素上运行的事实.
Note: while these 8 lines of code using mapPartitions
could be written as 1 line with a map
or flatMap
, I'm intentionally using mapPartitions
to take advantage of the fact that it operates over each partition rather than each element in the RDD
.
有什么想法吗?
推荐答案
防止强制整个分区物化"的一种方法是将Iterator
转换为Stream,然后使用Stream
的功能API(例如map
函数).
One way to prevent forcing the "materialization" of the entire partition is by converting the Iterator
into a Stream, and then using Stream
's functional API (e.g. map
function).
如何将迭代器转换为流?提出了几种将Iterator
转换为Stream
的好方法,因此采用其中建议的一种方法,我们最终可能会得出以下结论:
How to convert an iterator to a stream? suggests a few good ways to convert an Iterator
into a Stream
, so taking one of the options suggested there we can end up with:
rdd.mapPartitions((Iterator<InObj> iter) -> {
Iterable<InObj> iterable = () -> iter;
return StreamSupport.stream(iterable.spliterator(), false)
.map(s -> transformRow(s)) // or whatever transformation
.iterator();
});
这应该是从迭代器到迭代器"的转换,因为所有使用的中间API(Iterable
,Stream
)都是惰性计算的.
Which should be an "Itrator-to-Iterator" transformation, because all the intermediate APIs used (Iterable
, Stream
) are lazily evaluated.
编辑:我没有亲自测试过,但是OP对此进行了评论,并引用了使用列表中的Stream并不会提高效率".我不知道为什么会这样,而且我不知道这总体上是否正确,但值得一提.
EDIT: I haven't tested it myself, but the OP commented, and I quote, that "there is no efficiency increase by using a Stream over a list". I don't know why that is, and I don't know if that would be true in general, but worth mentioning.
这篇关于Apache Spark:在Java中有效使用mapPartitions的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!