嵌套的 Java 8 并行 forEach 循环性能很差.这种行为是预期的吗? [英] Nested Java 8 parallel forEach loop perform poor. Is this behavior expected?

查看:17
本文介绍了嵌套的 Java 8 并行 forEach 循环性能很差.这种行为是预期的吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意:我已经在另一篇 SO 帖子中解决了这个问题 - 在嵌套的 Java 8 并行流操作中使用信号量可能会死锁.这是一个错误吗? - 但是这篇文章的标题表明问题与信号量的使用有关 - 这有点分散了讨论的注意力.我创建这个是为了强调嵌套循环可能存在性能问题——尽管这两个问题可能有一个共同的原因(也许是因为我花了很多时间来解决这个问题).(我不认为它是重复的,因为它强调了另一个症状——但如果你真的删除它).

Note: I already addressed this problem in another SO post - Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug? -, but the title of this post suggested that the problem is related to the use of a semaphore - which somewhat distracted the discussion. I am creating this one to stress that nested loops might have a performance issue - although both problems have likely a common cause (and maybe because it took me a lot of time to figure out this problem). (I don't see it as a duplicate, because it is stressing another symptom - but if you do just delete it).

问题:如果嵌套了两个Java 8 stream.parallel().forEach循环并且所有任务都是独立的、无状态的等等——除了被提交到公共FJ池——,那么嵌套并行循环内的并行循环比在并行循环内嵌套顺序循环的性能要差得多.更糟糕的是:如果包含内循环的操作是同步的,你会得到一个死锁.

Problem: If you nest two Java 8 stream.parallel().forEach loops and all tasks are independent, stateless, etc. - except for being submitted to the common FJ pool -, then nesting a parallel loop inside a parallel loop performs much poorer than nesting a sequential loop inside a parallel loop. Even worse: If the operation containing the inner loop is synchronized, your will get a DEADLOCK.

性能问题的演示

如果没有同步",您仍然可以观察到性能问题.您可以在以下位置找到演示代码:http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java(有关更详细的说明,请参阅那里的 JavaDoc).

Without the 'synchronized' you can still observe a performance problem. You find a demo code for this at: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (see the JavaDoc there for a more detailed description).

我们的设置如下:我们有一个嵌套的 stream.parallel().forEach().

Our setup here is as follows: We have a nested stream.parallel().forEach().

  • 内循环是独立的(无状态、无干扰等 - 除了使用公共池)并且在最坏的情况下总共消耗 1 秒,即如果按顺序处理.
  • 外循环的一半任务在该循环之前消耗 10 秒.
  • 该循环后一半消耗 10 秒.
  • 因此,每个线程总共消耗 11 秒(最坏情况).* 我们有一个布尔值,它允许将内循环从 parallel() 切换到sequential().

现在:将 24 个外循环任务提交到并行度为 8 的池中,我们预计最多需要 24/8 * 11 = 33 秒(在 8 核或更好的机器上).

Now: submitting 24 outer-loop-tasks to a pool with parallelism 8 we would expect 24/8 * 11 = 33 seconds at best (on an 8 core or better machine).

结果是:

  • 使用内部顺序循环:33 秒.
  • 使用内部并行循环:>80 秒(我有 92 秒).

问题:您能确认这种行为吗?这是人们期望从框架中得到的东西吗?(我现在更小心一点,声称这是一个错误,但我个人认为这是由于 ForkJoinTask 的实现中的错误.备注:我已将此发布到 concurrency-interest(请参阅 http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ),但到目前为止我还没有从那里得到确认).

Question: Can you confirm this behavior? Is this something one would expect from the framework? (I am a bit more careful now with a claim that this is a bug, but I personally believe that it is due to a bug in the implementation of ForkJoinTask. Remark: I have posted this to concurrency-interest (see http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), but so far I did not get confirmation from there).

僵局的证明

下面的代码会死锁

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

其中 numberOfTasksInOuterLoop = 24numberOfTasksInInnerLoop = 240outerLoopOverheadFactor = 10000doWork 是一些无状态的 CPU 燃烧器.

where numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000 and doWork is some stateless CPU burner.

您可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java(有关更详细的说明,请参阅那里的 JavaDoc).

You find a complete demo code at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (see the JavaDoc there for a more detailed description).

这种行为是预期的吗?请注意,有关 Java 并行流的文档没有提到任何嵌套或同步问题.此外,没有提到两者都使用公共 fork-join-pool 的事实.

Is this behavior expected? Note that the documentation on Java parallel streams does not mention any issue with nesting or synchronization. Also, the fact that both use a common fork-join-pool is not mentioned.

更新

关于性能问题的另一个测试可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java- 这个测试没有任何阻塞操作(没有 Thread.sleep 和不同步).我在这里编译了更多评论:http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Another test on the performance issue can be found at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java - this test come without any blocking operation (no Thread.sleep and not synchronized). I compiled some more remarks here: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

更新 2

这个问题和更严重的信号量死锁问题似乎已经在 J​​ava8 u40 中修复了.

It appears as if this problem and the more severe DEADLOCK with semaphores has been fixed in Java8 u40.

推荐答案

问题是你配置的相当有限的并行性被外层流处理吃掉了:如果你说你想要八个线程,处理一个流超过 8 个带有 parallel() 的项目,它将创建 8 个工作线程并让它们处理项目.

The problem is that the rather limited parallelism you have configured is eaten up by the outer stream processing: if you say that you want eight threads and process a stream of more than eight items with parallel() it will create eight worker threads and let them process items.

然后在您的使用者中,您正在使用 parallel() 处理另一个流,但没有剩余的工作线程.由于工作线程在等待内部流处理结束时被阻塞,ForkJoinPool 必须创建违反您配置的并行性的新工作线程.在我看来,它不会回收这些扩展线程,而是让它们在处理后立即死亡.因此,在您的内部处理中,会创建和处理新线程,这是一项代价高昂的操作.

Then within your consumer you are processing another stream using parallel() but there are no worker threads left. Since the worker threads are blocked waiting for the end of the of the inner stream processing, the ForkJoinPool has to create new worker threads which violate your configured parallelism. It seems to me that it does not recycle these extend threads but let them die right after processing. So within your inner processing, new threads are created and disposed which is an expensive operation.

您可能会将其视为一个缺陷,即启动线程不参与并行流处理的计算,而只是等待结果,但即使已修复,您仍然会遇到一个难以解决的一般问题(如果有的话)修复:

You might see it as a flaw that the initiating threads do not contribute to the computation of a parallel stream processing but just wait for the result but even if that was fixed you still have a general problem that is hard (if ever) to fix:

每当工作线程数与外部流项目之间的比率很低时,实现将把它们全部用于外部流,因为它不知道该流是外部流.因此,并行执行内部流需要比可用线程更多的工作线程.使用调用者线程来参与计算可以以一种性能等同于串行计算的方式修复它,但在此处获得并行执行的优势不适用于固定数量的工作线程的概念.

Whenever the ratio between the number of worker threads to outer stream items is low, the implementation will use them all for the outer stream as it doesn’t know that the stream is an outer stream. So executing an inner stream in parallel requests more worker threads than available. Using the caller thread for contributing to the computation could fix it in a way that the performance equals the serial computation but getting an advantage of parallel execution here does not work well with the concept of a fixed number of worker threads.

请注意,您在这里只是触及了这个问题的表面,因为您对项目的处理时间相当平衡.如果内部项和外部项的处理出现分歧(与同一级别的项相比),则问题会更糟.

Note that you are scratching on the surface of this problem here, as you have rather balanced processing times for the items. If the processing of both, inner items and outer items, diverge (compared to items on the same level), the problem will be even worse.

更新:通过分析和查看代码,ForkJoinPool 确实尝试使用等待线程进行工作窃取",但使用不同的代码取决于Thread 是工作线程还是其他线程.结果,一个工作线程实际上大约有 80% 的时间在等待并且几乎不做任何工作,而其他线程真正参与计算......

Update: by profiling and looking at the code it seems that the ForkJoinPool does attempts to use the waiting thread for "work stealing" but using different code depending on the fact whether the Thread is a worker thread or some other thread. As a result, a worker thread is actually waiting about 80% of the time and doing very little to no work while other threads really contribute to the computation…

更新 2:为了完整起见,这里是评论中描述的简单并行执行方法.由于它将每个项目排入队列,因此当单个项目的执行时间相当小时,预计会有很多开销.所以这不是一个复杂的解决方案,而是一个演示,可以在没有太多魔法的情况下处理长时间运行的任务......

Update 2: for completeness, here the simple parallel execution approach as described in the comments. Since it enqueues every item it is expected to have to much overhead when the execution time for a single item is rather small. So it’s not a sophisticated solution but rather a demonstration that it is possible to handle long running tasks without much magic…

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}

这篇关于嵌套的 Java 8 并行 forEach 循环性能很差.这种行为是预期的吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆