Apache Spark:在Java中有效使用mapPartitions [英] Apache Spark: Effectively using mapPartitions in Java

查看:349
本文介绍了Apache Spark:在Java中有效使用mapPartitions的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark的开发人员在当前早期发行的名为 High Performance Spark 的教科书中指出:

In the currently early-release textbook titled High Performance Spark, the developers of Spark note that:

为了使Spark能够灵活地溢出一些记录 到磁盘上,重要的是要在mapPartitions内以这样的方式表示您的功能 您的函数不会强制在内存中加载整个分区的方式(例如 隐式转换为列表).迭代器有很多方法可以编写功能样式 转换,或者您可以构造自己的自定义迭代器.当一个 转换直接采用并返回一个迭代器,而无需强制执行 另一个集合,我们称为迭代器到迭代器的转换.

To allow Spark the flexibility to spill some records to disk, it is important to represent your functions inside of mapPartitions in such a way that your functions don’t force loading the entire partition in-memory (e.g. implicitly converting to a list). Iterators have many methods we can write functional style transformations on, or you can construct your own custom iterator. When a transformation directly takes and returns an iterator without forcing it through another collection, we call these iterator-to-iterator transformations.

但是,教科书缺少使用mapPartitions或该方法的类似变体的良好示例.网上几乎没有很好的代码示例,其中大多数是Scala.例如,我们在

However, the textbook lacks good examples using mapPartitions or similar variations of the method. And there's few good code examples existing online--most of which are Scala. For example, we see this Scala code using mapPartitions written by zero323 on How to add columns into org.apache.spark.sql.Row inside of mapPartitions.

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show

不幸的是,Java没有为迭代器提供任何与iter.map(...)一样好的功能.因此,这就引出了一个问题,即如何能在不将RDD作为列表完全溢出到磁盘的情况下,将mapPartitions有效地用于从迭代器到迭代器的转换?

Unfortunately, Java doesn't provide anything as nice as iter.map(...) for iterators. So it begs the question, how can one effectively use the iterator-to-iterator transformations with mapPartitions without entirely spilling an RDD to disk as a list?

JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

这似乎是在Java示例中使用mapPartitions的通用语法,但是我不认为这是最有效的,假设您有一个具有成千上万条记录(甚至更多)的JavaRDD ...因为Spark适用于大数据).您最终将得到迭代器中所有对象的列表,只是将其转换为迭代器(这就意味着某种映射函数在此效率会更高).

This seems to be the general syntax for using mapPartitions in Java examples, but I don't see how this would be the most efficient, supposing you have a JavaRDD with tens of thousands of records (or even more...since, Spark is for big data). You'd eventually end up with a list of all the objects in the iterator, just to turn it back into an iterator (which begs to say that a map function of some sort would be much more efficient here).

注意:尽管使用mapPartitions的这8行代码可以用mapflatMap编写为1行,但我故意使用mapPartitions来利用它在每个分区而不是RDD中的每个元素上运行的事实.

Note: while these 8 lines of code using mapPartitions could be written as 1 line with a map or flatMap, I'm intentionally using mapPartitions to take advantage of the fact that it operates over each partition rather than each element in the RDD.

有什么想法吗?

推荐答案

防止强制整个分区物化"的一种方法是将Iterator转换为Stream,然后使用Stream的功能API(例如map函数).

One way to prevent forcing the "materialization" of the entire partition is by converting the Iterator into a Stream, and then using Stream's functional API (e.g. map function).

如何将迭代器转换为流?提出了几种将Iterator转换为Stream的好方法,因此采用其中建议的一种方法,我们最终可能会得出以下结论:

How to convert an iterator to a stream? suggests a few good ways to convert an Iterator into a Stream, so taking one of the options suggested there we can end up with:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

这应该是从迭代器到迭代器"的转换,因为所有使用的中间API(IterableStream)都是惰性计算的.

Which should be an "Itrator-to-Iterator" transformation, because all the intermediate APIs used (Iterable, Stream) are lazily evaluated.

编辑:我没有亲自测试过,但是OP对此进行了评论,并引用了使用列表中的Stream并不会提高效率".我不知道为什么会这样,而且我不知道这总体上是否正确,但值得一提.

EDIT: I haven't tested it myself, but the OP commented, and I quote, that "there is no efficiency increase by using a Stream over a list". I don't know why that is, and I don't know if that would be true in general, but worth mentioning.

这篇关于Apache Spark:在Java中有效使用mapPartitions的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆