为什么Files.list()并行流的执行速度比使用Collection.parallelStream()慢得多? [英] Why is Files.list() parallel stream performing so much slower than using Collection.parallelStream()?

查看:228
本文介绍了为什么Files.list()并行流的执行速度比使用Collection.parallelStream()慢得多?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码片段是获取目录列表的方法的一部分,在每个文件上调用extract方法并将生成的药物对象序列化为xml。

The following code fragment is part of a method that gets a directory listing, calls an extract method on each file and serializes the resulting drug object to xml.

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .parallel()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

这是完全相同的代码完全相同但使用普通的 .list()调用获取目录列表并在结果列表中调用 .parallelStream()

Here is the exact same code doing the exact same thing but using a plain .list() call to get the directory listing and calling .parallelStream() on the resulting list.

Arrays.asList(infoDir.toFile().list())
    .parallelStream()
    .map(f -> infoDir.resolve(f))
    .map(this::extract)
    .forEachOrdered(drug -> {
        try {
            marshaller.write(drug);
        } catch (JAXBException ex) {
            ex.printStackTrace();
    }
});

我的机器是四核MacBook Pro,Java v 1.8.0_60(内置1.8.0_60-b27) )。

My machine is a quad core MacBook Pro, Java v 1.8.0_60 (build 1.8.0_60-b27).

我正在处理~7000个文件。平均3次运行:

I am processing ~ 7000 files. The averages of 3 runs:

第一个版本:
使用 .parallel():20秒。没有 .parallel():41秒

First version: With .parallel(): 20 seconds. Without .parallel(): 41 seconds

第二个版本:
.parallelStream():12秒。使用 .stream():41秒。

Second version: With .parallelStream(): 12 seconds. With .stream(): 41 seconds.

并行模式下的8秒似乎是一个巨大的差异,因为从流中读取并执行所有繁重工作的 extract 方法以及执行最终写入的 write 调用未更改。

Those 8 seconds in parallel mode seem like an enormous difference given that the extract method that reads from the stream and does all the heavy work and the write call doing the final writes are unchanged.

推荐答案

问题在于当前实现的Stream API以及 IteratorSpliterator的当前实现对于未知大小的源严重将这些源拆分为并行任务。你很幸运拥有超过1024个文件,否则你将没有任何并行化的好处。当前流API实现考虑了从 Spliterator 返回的 estimateSize()值。未知大小的 IteratorSpliterator 在拆分之前返回 Long.MAX_VALUE ,其后缀始终返回 Long。 MAX_VALUE 。它的拆分策略如下:

The problem is that current implementation of Stream API along with the current implementation of IteratorSpliterator for unknown size source badly splits such sources to parallel tasks. You were lucky having more than 1024 files, otherwise you would have no parallelization benefit at all. Current Stream API implementation takes into account the estimateSize() value returned from Spliterator. The IteratorSpliterator of unknown size returns Long.MAX_VALUE before split and its suffix always returns Long.MAX_VALUE as well. Its splitting strategy is the following:


  1. 定义当前批量大小。电流式是启动与1024个元素和算术增加(2048,3072,4096,5120等),直到 MAX_BATCH 达到尺寸(它是33554432个元件)。

  2. 将输入元素(在您的情况下为Paths)消耗到数组中,直到达到批量大小或输入用尽。

  3. 返回 ArraySpliterator 将创建的数组作为前缀进行迭代,将其自身保留为后缀。

  1. Define the current batch size. Current formula is to start with 1024 elements and increase arithmetically (2048, 3072, 4096, 5120 and so on) until MAX_BATCH size is reached (which is 33554432 elements).
  2. Consume input elements (in your case Paths) into array until the batch size is reached or input is exhausted.
  3. Return an ArraySpliterator iterating over the created array as prefix, leaving itself as suffix.

假设您有7000个文件。 Stream API会询问估计的大小, IteratorSpliterator 返回 Long.MAX_VALUE 。好的,Stream API要求 IteratorSpliterator 进行拆分,它会从底层的 DirectoryStream 中收集1024个元素到数组并拆分为 ArraySpliterator (估计大小为1024)及其本身(估计大小仍为 Long.MAX_VALUE )。由于 Long.MAX_VALUE 远远超过1024,因此Stream API决定继续拆分较大的部分,甚至不试图拆分较小的部分。所以整体拆分树是这样的:

Suppose you have 7000 files. Stream API asks for estimated size, IteratorSpliterator returns Long.MAX_VALUE. Ok, Stream API asks the IteratorSpliterator to split, it collects 1024 elements from the underlying DirectoryStream to the array and splits to ArraySpliterator (with estimated size 1024) and itself (with estimated size which is still Long.MAX_VALUE). As Long.MAX_VALUE is much much more than 1024, Stream API decides to continue splitting the bigger part without even trying to split the smaller part. So the overall splitting tree goes like this:

                     IteratorSpliterator (est. MAX_VALUE elements)
                           |                    |
ArraySpliterator (est. 1024 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 2048 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 3072 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 856 elements)    IteratorSpliterator (est. MAX_VALUE elements)
                                                    |
                                        (split returns null: refuses to split anymore)

那么之后就有5个并行要执行的任务:实际包含1024,2048,3072,856和0个元素。请注意,即使最后一个块有0个元素,它仍然会报告它估计有 Long.MAX_VALUE 元素,因此Stream API会将其发送到 ForkJoinPool 也是如此。糟糕的是,Stream API认为前四个任务的进一步拆分是无用的,因为它们的估计大小要小得多。所以你得到的是非常不均匀的输入分割,最大利用四个CPU内核(即使你有更多)。如果每个元素的处理对于任何元素大致相同,那么整个过程将等待最大部分(3072个元素)完成。所以最大加速可能是7000/3072 = 2.28x。因此,如果顺序处理需要41秒,那么并行流将需要大约41 / 2.28 = 18秒(这接近您的实际数字)。

So after that you have five parallel tasks to be executed: actually containing 1024, 2048, 3072, 856 and 0 elements. Note that even though the last chunk has 0 elements, it still reports that it has estimatedly Long.MAX_VALUE elements, so Stream API will send it to the ForkJoinPool as well. The bad thing is that Stream API thinks that further splitting of first four tasks is useless as their estimated size is much less. So what you get is very uneven splitting of the input which utilizes four CPU cores max (even if you have much more). If your per-element processing takes roughly the same time for any element, then the whole process would wait for the biggest part (3072 elements) to complete. So maximum speedup you may have is 7000/3072=2.28x. Thus if sequential processing takes 41 seconds, then the parallel stream will take around 41/2.28 = 18 seconds (which is close to your actual numbers).

您的解决方法解决方案完全没问题。请注意,使用 Files.list()。parallel()您还可以将所有输入 Path 元素存储在内存中(在 ArraySpliterator 对象中)。因此,如果手动将它们转储到列表中,则不会浪费更多内存。支持数组的列表实现,如 ArrayList (目前由 Collectors.toList()创建)可以均匀分割问题,这会导致额外的加速。

Your work-around solution is completely fine. Note that using Files.list().parallel() you also have all the input Path elements stored in the memory (in ArraySpliterator objects). Thus you will not waste more memory if you manually dump them into the List. Array-backed list implementations like ArrayList (which is currently created by Collectors.toList()) can split evenly without any problems, which results in additional speed-up.

为什么这种情况没有得到优化?当然,这不是一个不可能的问题(虽然实施可能非常棘手)。对于JDK开发人员来说,这似乎不是高优先级问题。在邮件列表中有关于此主题的几个讨论。您可以阅读Paul Sandoz的消息此处,他对我的评论优化工作。

Why such case is not optimized? Of course it's not impossible problem (though implementation could be quite tricky). It seems that it's not high-priority problem for JDK developers. There were several discussions on this topic in mailing lists. You may read Paul Sandoz message here where he comments on my optimization effort.

这篇关于为什么Files.list()并行流的执行速度比使用Collection.parallelStream()慢得多?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆