Java 8 并行流如何处理抛出的异常? [英] How do Java 8 parallel streams behave on a thrown exception?

查看:43
本文介绍了Java 8 并行流如何处理抛出的异常?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Java 8 并行流如何处理消费子句中抛出的异常,例如在 forEach 处理中?例如下面的代码:

How do Java 8 parallel streams behave on a thrown exception in the consuming clause, for example in forEach handling? For example, the following code:

final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
    .parallel()
    .forEach(i -> {
        // Throw only on one of the threads.
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
        });

它是否立即停止处理的元素?它是否等待已经开始的元素完成?它是否等待所有流完成?抛出异常后是否开始处理流元素?

Does it stop the handled elements immediately? Does it wait for the already started elements to finish? Does it wait for all the stream to finish? Does it start handling stream elements after the exception is thrown?

什么时候回来?异常后立即?是否所有/部分元素都由消费者处理?

When does it return? Immediately after the exception? After all/part of the elements were handled by the consumer?

在并行流抛出异常后元素是否继续被处理?(找到一个发生这种情况的案例).

Do elements continue being handled after the parallel stream threw the exception? (Found a case where this happened).

这里有一般规则吗?

编辑 (15-11-2016)

EDIT (15-11-2016)

试图确定并行流是否提前返回,我发现它不是确定的:

Trying to determine if the parallel stream returns early, I found that it's not determinate:

@Test
public void testParallelStreamWithException() {
    AtomicInteger overallCount = new AtomicInteger(0);
    AtomicInteger afterExceptionCount = new AtomicInteger(0);
    AtomicBoolean throwException = new AtomicBoolean(true);

    try {
        IntStream.range(0, 1000)
            .parallel()
            .forEach(i -> {
                overallCount.incrementAndGet();
                afterExceptionCount.incrementAndGet();
                try {
                    System.out.println(i + " Sleeping...");
                    Thread.sleep(1000);
                    System.out.println(i + " After Sleeping.");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Throw only on one of the threads and not on main thread.
                if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
                    System.out.println("Throwing exception - " + i);
                    throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
                }
            });
        Assert.fail("Should not get here.");
    }
    catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }
    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
}

延迟返回 当不是从主线程抛出时.这导致许多元素在抛出异常后被处理.在我的机器上,在抛出异常后处理了大约 200 个元素.但是,并非所有 1000 个元素都被处理.那么这里的规则是什么?为什么即使抛出异常也处理了更多元素?

Late return when throwing not from the main thread. This caused a lot of new elements to be handled way after the exception was thrown. On my machine, about 200 elements were handled after the exception was thrown. BUT, not all 1000 elements were handled. So what's the rule here? Why more elements were handled even though the exception was thrown?

Early return 当去掉not (!) 符号时,导致在主线程中抛出异常.只有已经开始的元素完成了处理,没有处理新的元素.早回来就是这里的情况.与之前的行为不一致.

Early return when removing the not (!) sign, causing the exception to be thrown in the main thread. Only the already started elements finished processing and no new ones were handled. Returning early was the case here. Not consistent with the previous behavior.

我在这里遗漏了什么?

推荐答案

在其中一个阶段抛出异常时,不等待其他操作完成,将异常重新抛出给调用者.这就是 ForkJoinPool 处理它的方式.

When an exception is thrown in one of the stages, it does not wait for other operations to finish, the exception is re-thrown to the caller. That is how ForkJoinPool handles that.

相比之下, findFirst 例如在并行运行时,只有在所有操作完成处理后才会将结果呈现给调用者(即使在需要完成所有操作之前就知道结果).

In contrast findFirst for example when run in parallel, will present the result to the caller only after ALL operations have finished processing (even if the result is known before the need to finish of all operations).

换句话说:它会提前返回,但会让所有正在运行的任务完成.

Put in other words : it will return early, but will leave all the running tasks to finish.

编辑以回答最后的评论

Holger 的回答(评论中的链接)对此进行了大量解释,但这里有一些细节.

This is very much explained by Holger's answer (link in comments), but here are some details.

1) 当杀死所有但主线程时,你也杀死了所有应该由这些线程处理的任务.所以这个数字应该实际上应该在 250 左右,因为有 1000 个任务和 4 个线程,我假设这会返回 3?:

1) When killing all BUT the main thread, you are also killing all the tasks that were supposed to be handled by these threads. So that number should actually be more around 250 as there are 1000 tasks and 4 Threads, I assume this returns 3?:

int result = ForkJoinPool.getCommonPoolParallelism();

理论上有 1000 个任务,有 4 个线程,每个线程应该处理 250 个任务,然后你杀死其中的 3 个意味着丢失 750 个任务.还有 250 个任务要执行,ForkJoinPool 将跨越 3 个新线程来执行这 250 个剩下的任务.

Theoretically there are 1000 tasks, there are 4 threads, each supposed to handle 250 tasks, then you kill 3 of them meaning 750 tasks are lost. There are 250 tasks left to execute, and ForkJoinPool will span 3 new threads to execute these 250 left tasks.

你可以尝试一些事情,像这样改变你的流(使流没有大小):

A few things you can try, change your stream like this (making the stream not sized):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach

这一次,会有更多的操作结束,因为初始拆分索引是未知的,并且是由其他一些策略选择的.你也可以尝试改变这个:

This time, there would be many more operations ending, because the initial split index is unknown and chosen by some other strategy. What you could also try is change this :

 if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {

为此:

 if (!Thread.currentThread().getName().equals("main")) {

这一次你总是会杀死除main之外的所有线程,直到某个时刻,由于任务太小而无法拆分,因此ForkJoinPool不会创建新线程,因此不需要其他线程.在这种情况下,完成的任务甚至更少.

This time you would always kill all threads besides main, until a certain point, where no new threads will be created by ForkJoinPool as the task is too small to split, thus no need for other threads. In this case even less tasks would finish.

2) 你的第二个例子,当你真正杀死主线程时,就像代码一样,你不会看到其他线程的实际运行.改变它:

2) Your second example, when you actually kill the main thread, as the way code is, you will not see the actual running of other threads. Change it :

    } catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60);

    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());

这篇关于Java 8 并行流如何处理抛出的异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆