为什么 Files.list() 并行流的执行速度比使用 Collection.parallelStream() 慢得多? [英] Why is Files.list() parallel stream performing so much slower than using Collection.parallelStream()?

查看:32
本文介绍了为什么 Files.list() 并行流的执行速度比使用 Collection.parallelStream() 慢得多?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码片段是获取目录列表、对每个文件调用提取方法并将生成的药物对象序列化为 xml 的方法的一部分.

try(Stream paths = Files.list(infoDir)) {路径.平行线().map(this::extract).forEachOrdered(drug -> {尝试 {marshaller.write(drug);} catch (JAXBException ex) {ex.printStackTrace();}});}

这是完全相同的代码,但使用普通的 .list() 调用来获取目录列表并调用 .parallelStream()结果列表.

Arrays.asList(infoDir.toFile().list()).parallelStream().map(f -> infoDir.resolve(f)).map(this::extract).forEachOrdered(drug -> {尝试 {marshaller.write(drug);} catch (JAXBException ex) {ex.printStackTrace();}});

我的机器是四核 MacBook Pro,Java v 1.8.0_60(构建 1.8.0_60-b27).

我正在处理大约 7000 个文件.3 次运行的平均值:

第一个版本:使用 .parallel():20 秒.没有 .parallel():41 秒

第二个版本:使用 .parallelStream():12 秒.使用 .stream():41 秒.

考虑到从流中读取并执行所有繁重工作的 extract 方法和执行以下操作的 write 调用,并行模式下的 8 秒似乎是一个巨大的差异最终写入不变.

解决方案

问题是当前的 Stream API 实现以及用于未知大小源的 IteratorSpliterator 的当前实现严重地将这些源拆分为并行任务.您很幸运拥有超过 1024 个文件,否则您将根本没有并行化优势.当前的 Stream API 实现考虑了从 Spliterator 返回的 estimateSize() 值.未知大小的IteratorSpliterator在拆分前返回Long.MAX_VALUE,其后缀也总是返回Long.MAX_VALUE.其分裂策略如下:

  1. 定义当前的批量大小.当前的公式是从 1024 个元素开始,然后按算术增加(2048、3072、4096、5120 等),直到达到 MAX_BATCH 大小(即 33554432 个元素).
  2. 将输入元素(在您的情况下为 Paths)消耗到数组中,直到达到批量大小或输入用完为止.
  3. 返回一个 ArraySpliterator 迭代创建的数组作为前缀,将自身作为后缀.

假设您有 7000 个文件.Stream API 要求估计大小,IteratorSpliterator 返回 Long.MAX_VALUE.好的,Stream API 要求 IteratorSpliterator 拆分,它从底层 DirectoryStream 收集 1024 个元素到数组并拆分为 ArraySpliterator(估计大小1024) 和它本身(估计大小仍然是 Long.MAX_VALUE).由于 Long.MAX_VALUE 远大于 1024,Stream API 决定继续拆分较大的部分,甚至不尝试拆分较小的部分.所以整个分裂树是这样的:

 IteratorSpliterator(估计 MAX_VALUE 个元素)||ArraySpliterator(估计 1024 个元素) IteratorSpliterator(估计 MAX_VALUE 个元素)||/---------------/|||ArraySpliterator (est. 2048 个元素) IteratorSpliterator (est. MAX_VALUE 个元素)||/---------------/|||ArraySpliterator(估计3072个元素)IteratorSpliterator(估计MAX_VALUE个元素)||/---------------/|||ArraySpliterator(估计 856 个元素) IteratorSpliterator(估计 MAX_VALUE 个元素)|(拆分返回空值:拒绝再拆分)

所以之后你有五个并行任务要执行:实际上包含 1024、2048、3072、856 和 0 个元素.请注意,即使最后一个 chunk 有 0 个元素,它仍然报告它估计有 Long.MAX_VALUE 个元素,因此 Stream API 也会将其发送到 ForkJoinPool.不好的是,Stream API 认为进一步拆分前四个任务是没有用的,因为它们的估计大小要小得多.所以你得到的是非常不均匀的输入分割,它最多使用四个 CPU 核心(即使你有更多).如果您的每个元素处理对任何元素花费的时间大致相同,那么整个过程将等待最大部分(3072 个元素)完成.所以你可能拥有的最大加速是 7000/3072=2.28x.因此,如果顺序处理需要 41 秒,那么并行流将需要大约 41/2.28 = 18 秒(接近您的实际数字).

您的变通解决方案完全没问题.请注意,使用 Files.list().parallel() 您还可以将所有输入 Path 元素存储在内存中(在 ArraySpliterator 对象中).因此,如果您手动将它们转储到 List 中,您将不会浪费更多内存.数组支持的列表实现,如 ArrayList(目前由 Collectors.toList() 创建)可以均匀分割,没有任何问题,这会带来额外的加速.>

为什么这种情况没有优化?当然,这不是不可能的问题(尽管实施可能非常棘手).对于JDK开发人员来说,这似乎不是高优先级的问题.在邮件列表中有几个关于这个主题的讨论.您可以阅读 Paul Sandoz 消息这里他对我的评论优化工作.

The following code fragment is part of a method that gets a directory listing, calls an extract method on each file and serializes the resulting drug object to xml.

try(Stream<Path> paths = Files.list(infoDir)) {
    paths
        .parallel()
        .map(this::extract)
        .forEachOrdered(drug -> {
            try {
                marshaller.write(drug);
            } catch (JAXBException ex) {
                ex.printStackTrace();
            }
        });
}

Here is the exact same code doing the exact same thing but using a plain .list() call to get the directory listing and calling .parallelStream() on the resulting list.

Arrays.asList(infoDir.toFile().list())
    .parallelStream()
    .map(f -> infoDir.resolve(f))
    .map(this::extract)
    .forEachOrdered(drug -> {
        try {
            marshaller.write(drug);
        } catch (JAXBException ex) {
            ex.printStackTrace();
    }
});

My machine is a quad core MacBook Pro, Java v 1.8.0_60 (build 1.8.0_60-b27).

I am processing ~ 7000 files. The averages of 3 runs:

First version: With .parallel(): 20 seconds. Without .parallel(): 41 seconds

Second version: With .parallelStream(): 12 seconds. With .stream(): 41 seconds.

Those 8 seconds in parallel mode seem like an enormous difference given that the extract method that reads from the stream and does all the heavy work and the write call doing the final writes are unchanged.

解决方案

The problem is that current implementation of Stream API along with the current implementation of IteratorSpliterator for unknown size source badly splits such sources to parallel tasks. You were lucky having more than 1024 files, otherwise you would have no parallelization benefit at all. Current Stream API implementation takes into account the estimateSize() value returned from Spliterator. The IteratorSpliterator of unknown size returns Long.MAX_VALUE before split and its suffix always returns Long.MAX_VALUE as well. Its splitting strategy is the following:

  1. Define the current batch size. Current formula is to start with 1024 elements and increase arithmetically (2048, 3072, 4096, 5120 and so on) until MAX_BATCH size is reached (which is 33554432 elements).
  2. Consume input elements (in your case Paths) into array until the batch size is reached or input is exhausted.
  3. Return an ArraySpliterator iterating over the created array as prefix, leaving itself as suffix.

Suppose you have 7000 files. Stream API asks for estimated size, IteratorSpliterator returns Long.MAX_VALUE. Ok, Stream API asks the IteratorSpliterator to split, it collects 1024 elements from the underlying DirectoryStream to the array and splits to ArraySpliterator (with estimated size 1024) and itself (with estimated size which is still Long.MAX_VALUE). As Long.MAX_VALUE is much much more than 1024, Stream API decides to continue splitting the bigger part without even trying to split the smaller part. So the overall splitting tree goes like this:

                     IteratorSpliterator (est. MAX_VALUE elements)
                           |                    |
ArraySpliterator (est. 1024 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 2048 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 3072 elements)   IteratorSpliterator (est. MAX_VALUE elements)
                                           |        |
                           /---------------/        |
                           |                        |
ArraySpliterator (est. 856 elements)    IteratorSpliterator (est. MAX_VALUE elements)
                                                    |
                                        (split returns null: refuses to split anymore)

So after that you have five parallel tasks to be executed: actually containing 1024, 2048, 3072, 856 and 0 elements. Note that even though the last chunk has 0 elements, it still reports that it has estimatedly Long.MAX_VALUE elements, so Stream API will send it to the ForkJoinPool as well. The bad thing is that Stream API thinks that further splitting of first four tasks is useless as their estimated size is much less. So what you get is very uneven splitting of the input which utilizes four CPU cores max (even if you have much more). If your per-element processing takes roughly the same time for any element, then the whole process would wait for the biggest part (3072 elements) to complete. So maximum speedup you may have is 7000/3072=2.28x. Thus if sequential processing takes 41 seconds, then the parallel stream will take around 41/2.28 = 18 seconds (which is close to your actual numbers).

Your work-around solution is completely fine. Note that using Files.list().parallel() you also have all the input Path elements stored in the memory (in ArraySpliterator objects). Thus you will not waste more memory if you manually dump them into the List. Array-backed list implementations like ArrayList (which is currently created by Collectors.toList()) can split evenly without any problems, which results in additional speed-up.

Why such case is not optimized? Of course it's not impossible problem (though implementation could be quite tricky). It seems that it's not high-priority problem for JDK developers. There were several discussions on this topic in mailing lists. You may read Paul Sandoz message here where he comments on my optimization effort.

这篇关于为什么 Files.list() 并行流的执行速度比使用 Collection.parallelStream() 慢得多?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆