嵌套Java 8并行forEach循环执行效果差。这是预期的行为吗? [英] Nested Java 8 parallel forEach loop perform poor. Is this behavior expected?

查看:2677
本文介绍了嵌套Java 8并行forEach循环执行效果差。这是预期的行为吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意:我已经在另一个SO帖子中解决了这个问题 - 在嵌套Java 8并行流操作中使用信号量可能会导致DEADLOCK。这是一个错误吗?,但是这篇文章的标题建议问题与使用信号量有关 - 这有点分散了讨论。我创建这一个,强调嵌套循环可能有性能问题 - 虽然这两个问题可能有一个共同的原因(也许因为我花了很多时间来解决这个问题)。 (我不认为它是重复的,因为它强调另一种症状 - 但如果你只是删除它)。



如果你嵌套两个Java 8 stream.parallel()。forEach循环,并且所有任务是独立的,无状态的等 - 除了提交到公共FJ池,然后在并行循环中嵌套一个并行循环执行比在并行循环内嵌套顺序循环更差。更糟糕的是:如果包含内层循环的操作同步,则会得到一个DEADLOCK。



性能问题演示

如果没有同步,你仍然可以观察到性能问题。您可以在以下网址找到演示代码: http:// svn .finmath.net / finmath%20experiments / trunk / src / net / finmath / experiments / concurrency / NestedParallelForEachTest.java
(参见JavaDoc有更详细的描述)。



我们的设置如下:我们有一个嵌套的stream.parallel()。forEach()。




  • 内部循环是独立的(无状态,无干扰等 - 除了使用公共池),并且在最坏的情况下,即如果处理顺序,总共消耗1秒。

  • <
  • 该循环后10秒钟消耗一半。

  • 因此,每个线程总共消耗11秒(最坏情况)。
    *我们有一个布尔值,允许将内循环从parallel()切换到顺序()。



提交24个外环任务到池并行性8,我们预计最好是24/8 * 11 = 33秒(在8核或更好的机器上)。



结果是:




  • 对于内部顺序循环:33秒。

  • :> 80秒(我有92秒)。



问题:这是什么人会期望从框架? (我现在有点更加小心现在有一个声称这是一个错误,但我个人认为这是由于在执行ForkJoinTask的错误。注意:我已经发布这个并发利息(见 http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ),但到目前为止我没有得到确认。)



死锁演示
$ b

以下代码将会DEADLOCK

  //外层循环
IntStream.range 0,numberOfTasksInOuterLoop).parallel()。forEach(i - > {
doWork();
synchronized(this){
//内循环
IntStream.range ,numberOfTasksInInnerLoop).parallel()。forEach(j - > {
doWork();
});
}
}

其中 numberOfTasksInOuterLoop = 24 numberOfTasksInInnerLoop = 240 outerLoopOverheadFactor = 10000 doWork



您可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java
(请参阅JavaDoc有更详细的描述)。



这是预期的行为吗?请注意,Java并行流的文档没有提到嵌套或同步的任何问题。



更新



有关性能问题的另一个测试可以在 http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java
- 这个测试没有任何阻塞操作(没有线程.sleep和不同步)。我在这里汇编了更多的话: http://christian-fries.de /blog/files/2014-nested-java-8-parallel-foreach.html



更新2



似乎这个问题和更严重的DEADLOCK信号量已在Java8 u40中修复。

解决方案

问题是,你配置的相当有限的并行性被外部流处理所吞噬:如果你说你想要八个线程,并处理一个流超过八个项目与



然后在你的消费者中,你正在使用<$ c来处理另一个流$ c> parallel(),但没有工作线程。由于工作线程被阻塞等待内部流处理的结束, ForkJoinPool 必须创建违反配置的并行性的新工作线程。在我看来,它不回收这些扩展线程,但让他们在加工后立即死亡。因此,在你的内部处理中,创建和处理新线程是一个昂贵的操作。



你可能会认为启动线程对计算没有贡献的缺陷的并行流处理,但只是等待结果,但即使这是固定的,你仍然有一个一般的问题,很难(如果有)修复:



工作线程与外部流项的数量之间的比率低,则实现将使用它们全部用于外部流,因为它不知道该流是外部流。因此,并行执行内部流需要比可用的更多的工作线程。使用调用程序线程来贡献计算可以修复它的性能等于串行计算,但获得并行执行的优势在这里不能很好地与固定数量的工作线程的概念。



请注意,你在这里面临的问题表面上,因为你有相当平衡的处理时间的项目。如果对内部项目和外部项目的处理分歧(与同一层上的项目相比),则问题会更糟。






更新:通过剖析并查看代码,似乎 ForkJoinPool 尝试使用等待线程的工作窃取,但是根据 Thread 是工作线程还是其他线程的事实,使用不同的代码。因此,一个工作线程实际上在等待大约80%的时间,做很少的工作,没有工作,而其他线程真正贡献的计算...






更新2:为了完整性,这里是注释中描述的简单并行执行方法。因为它使每个项目排队,当单个项目的执行时间相当小时,它预期具有很多开销。所以这不是一个复杂的解决方案,而是一个演示,可以处理长时间运行的任务没有太多的魔法...

  import java。 lang.reflect.UndeclaredThrowableException; 
import java.util.concurrent。*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
static final布尔isInnerStreamParallel = true;

//设置:内部循环任务0.01秒在更坏的情况下。外环任务:10秒+内循环。该设置:(100 * 0.01秒+ 10秒)* 24/8 = 33秒。
static final int numberOfTasksInOuterLoop = 24; //在实际应用中,这可能是一个大数字(例如> 1000)。
static final int numberOfTasksInInnerLoop = 100; //在实际应用中,这可能是一个大数字(例如> 1000)。
static final int concurrentExecutionsLimitForStreams = 8;

public static void main(String [] args)throws InterruptedException,ExecutionException {
System.out.println(System.getProperty(java.version)++ System.getProperty (java.home));
new NestedParallelForEachTest1()。testNestedLoops();
E.shutdown();
}

final static ThreadPoolExecutor E = new ThreadPoolExecutor(
concurrentExecutionsLimitForStreams,concurrentExecutionsLimitForStreams,
2,TimeUnit.MINUTES,new SynchronousQueue& e)→r.run());

public static void parallelForEach(IntStream s,IntConsumer c){
s.mapToObj(i-> E.submit(() - > c.accept(i)))。 collect(Collectors.toList())
.forEach(NestedParallelForEachTest1 :: waitOrHelp);
}
static void waitOrHelp(Future f){
while(!f.isDone()){
Runnable r = E.getQueue()。
if(r!= null)r.run();
}
try {f.get(); }
catch(InterruptedException ex){throw new RuntimeException(ex); }
catch(ExecutionException eex){
Throwable t = eex.getCause();
if(t instanceof RuntimeException)throw(RuntimeException)t;
if(t instanceof Error)throw(Error)t;
throw new UndeclaredThrowableException(t);
}
}
public void testNestedLoops(NestedParallelForEachTest1 this){
long start = System.nanoTime();
//外循环
parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop),i-> {
if(i <10)sleep(10 * 1000);
if(isInnerStreamParallel){
//内循环并行:最坏情况(顺序)需要10 * numberOfTasksInInnerLoop millis
parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop),j - > sleep );
}
else {
//内循环作为顺序
IntStream.range(0,numberOfTasksInInnerLoop).sequential()。forEach(j-& );
}
if(i> = 10)sleep(10 * 1000);
}
long end = System.nanoTime();
System.out.println(Done in+ TimeUnit.NANOSECONDS.toSeconds(end-start)+sec。);
}
static void sleep(int milli){
try {
Thread.sleep(milli);
} catch(InterruptedException ex){
throw new AssertionError(ex);
}
}
}


Note: I already addressed this problem in another SO post - Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug? -, but the title of this post suggested that the problem is related to the use of a semaphore - which somewhat distracted the discussion. I am creating this one to stress that nested loops might have a performance issue - although both problems have likely a common cause (and maybe because it took me a lot of time to figure out this problem). (I don't see it as a duplicate, because it is stressing another symptom - but if you do just delete it).

Problem: If you nest two Java 8 stream.parallel().forEach loops and all tasks are independent, stateless, etc. - except for being submitted to the common FJ pool -, then nesting a parallel loop inside a parallel loop performs much poorer than nesting a sequential loop inside a parallel loop. Even worse: If the operation containing the inner loop is synchronized, your will get a DEADLOCK.

Demonstration of the performance issue

Without the 'synchronized' you can still observe a performance problem. You find a demo code for this at: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (see the JavaDoc there for a more detailed description).

Our setup here is as follows: We have a nested stream.parallel().forEach().

  • The inner loop is independent (stateless, no interference, etc. - except of the use of a common pool) and consumes 1 second in total in the worst case, namely if processed sequential.
  • Half of the tasks of the outer loop consume 10 seconds prior that loop.
  • Half consume 10 seconds after that loop.
  • Hence every thread consumes 11 seconds (worst case) in total.  * We have a boolean which allows to switch the inner loop from parallel() to sequential().

Now: submitting 24 outer-loop-tasks to a pool with parallelism 8 we would expect 24/8 * 11 = 33 seconds at best (on an 8 core or better machine).

The result is:

  • With inner sequential loop: 33 seconds.
  • With inner parallel loop: >80 seconds (I had 92 seconds).

Question: Can you confirm this behavior? Is this something one would expect from the framework? (I am a bit more careful now with a claim that this is a bug, but I personally believe that it is due to a bug in the implementation of ForkJoinTask. Remark: I have posted this to concurrency-interest (see http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), but so far I did not get confirmation from there).

Demonstration of the the deadlock

The following code will DEADLOCK

    // Outer loop
    IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
        doWork();
        synchronized(this) {
            // Inner loop
            IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
                doWork();
            });
        }
    });

where numberOfTasksInOuterLoop = 24, numberOfTasksInInnerLoop = 240, outerLoopOverheadFactor = 10000 and doWork is some stateless CPU burner.

You find a complete demo code at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (see the JavaDoc there for a more detailed description).

Is this behavior expected? Note that the documentation on Java parallel streams does not mention any issue with nesting or synchronization. Also, the fact that both use a common fork-join-pool is not mentioned.

Update

Another test on the performance issue can be found at http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java - this test come without any blocking operation (no Thread.sleep and not synchronized). I compiled some more remarks here: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html

Update 2

It appears as if this problem and the more severe DEADLOCK with semaphores has been fixed in Java8 u40.

解决方案

The problem is that the rather limited parallelism you have configured is eaten up by the outer stream processing: if you say that you want eight threads and process a stream of more than eight items with parallel() it will create eight worker threads and let them process items.

Then within your consumer you are processing another stream using parallel() but there are no worker threads left. Since the worker threads are blocked waiting for the end of the of the inner stream processing, the ForkJoinPool has to create new worker threads which violate your configured parallelism. It seems to me that it does not recycle these extend threads but let them die right after processing. So within your inner processing, new threads are created and disposed which is an expensive operation.

You might see it as a flaw that the initiating threads do not contribute to the computation of a parallel stream processing but just wait for the result but even if that was fixed you still have a general problem that is hard (if ever) to fix:

Whenever the ratio between the number of worker threads to outer stream items is low, the implementation will use them all for the outer stream as it doesn’t know that the stream is an outer stream. So executing an inner stream in parallel requests more worker threads than available. Using the caller thread for contributing to the computation could fix it in a way that the performance equals the serial computation but getting an advantage of parallel execution here does not work well with the concept of a fixed number of worker threads.

Note that you are scratching on the surface of this problem here, as you have rather balanced processing times for the items. If the processing of both, inner items and outer items, diverge (compared to items on the same level), the problem will be even worse.


Update: by profiling and looking at the code it seems that the ForkJoinPool does attempts to use the waiting thread for "work stealing" but using different code depending on the fact whether the Thread is a worker thread or some other thread. As a result, a worker thread is actually waiting about 80% of the time and doing very little to no work while other threads really contribute to the computation…


Update 2: for completeness, here the simple parallel execution approach as described in the comments. Since it enqueues every item it is expected to have to much overhead when the execution time for a single item is rather small. So it’s not a sophisticated solution but rather a demonstration that it is possible to handle long running tasks without much magic…

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class NestedParallelForEachTest1 {
    static final boolean isInnerStreamParallel = true;

    // Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
    static final int numberOfTasksInOuterLoop = 24;  // In real applications this can be a large number (e.g. > 1000).
    static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
    static final int concurrentExecutionsLimitForStreams = 8;

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
        new NestedParallelForEachTest1().testNestedLoops();
        E.shutdown();
    }

    final static ThreadPoolExecutor E = new ThreadPoolExecutor(
        concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
        2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );

    public static void parallelForEach(IntStream s, IntConsumer c) {
        s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
         .forEach(NestedParallelForEachTest1::waitOrHelp);
    }
    static void waitOrHelp(Future f) {
        while(!f.isDone()) {
            Runnable r=E.getQueue().poll();
            if(r!=null) r.run();
        }
        try { f.get(); }
        catch(InterruptedException ex) { throw new RuntimeException(ex); }
        catch(ExecutionException eex) {
            Throwable t=eex.getCause();
            if(t instanceof RuntimeException) throw (RuntimeException)t;
            if(t instanceof Error) throw (Error)t;
            throw new UndeclaredThrowableException(t);
        }
    }
    public void testNestedLoops(NestedParallelForEachTest1 this) {
        long start = System.nanoTime();
        // Outer loop
        parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
            if(i < 10) sleep(10 * 1000);
            if(isInnerStreamParallel) {
                // Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
                parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
            }
            else {
                // Inner loop as sequential
                IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
            }
            if(i >= 10) sleep(10 * 1000);
        });
        long end = System.nanoTime();
        System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
    }
    static void sleep(int milli) {
        try {
            Thread.sleep(milli);
        } catch (InterruptedException ex) {
            throw new AssertionError(ex);
        }
    }
}

这篇关于嵌套Java 8并行forEach循环执行效果差。这是预期的行为吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆