Java中的嵌套并行流 [英] Nested parallel streams in Java

查看:66
本文介绍了Java中的嵌套并行流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想了解Java中嵌套流之间的排序约束.

I want to understand the ordering constraints between nested streams in Java.

示例1:

public static void main(String[] args) {
    IntStream.range(0, 10).forEach(i -> {
        System.out.println(i);
        IntStream.range(0, 10).forEach(j -> {
            System.out.println("    " + i + " " + j);
        });
    });
}

此代码确定性地执行,因此内部循环在每个j上运行forEach,然后外部循环在下一个i上运行自己的forEach:

This code executes deterministically, so the inner loop runs forEach on each j before the outer loop runs its own forEach on the next i:

0
    0 0
    0 1
    0 2
    0 3
    0 4
    0 5
    0 6
    0 7
    0 8
    0 9
1
    1 0
    1 1
    1 2
    1 3
    1 4
    1 5
    1 6
    1 7
    1 8
    1 9
2
    2 0
    2 1
    2 2
    2 3
...

示例2:

public static void main(String[] args) {
    IntStream.range(0, 10).parallel().forEach(i -> {
        System.out.println(i);
        IntStream.range(0, 10).parallel().forEach(j -> {
            System.out.println("    " + i + " " + j);
        });
    });
}

如果像在第二个示例中那样使流成为parallel(),我可以想象内部工作人员在等待线程在外部工作队列中可用时阻塞,因为外部工作队列线程必须在完成时阻塞内部线程流,默认线程池只有有限数量的线程.但是,似乎不会发生死锁:

If the streams are made parallel() as in this second example, I could imagine the inner workers blocking as they wait for threads to become available in the outer work queue, since the outer work queue threads have to block on the completion of the inner stream, and the default thread pool only has a limited number of threads. However, deadlock does not appear to occur:

6
5
8
    8 6
0
1
    6 2
7
    1 6
    8 5
    7 6
    8 8
2
    0 6
    0 2
    0 8
    5 2
    5 4
    5 6
    0 5
    2 6
    7 2
    7 5
    7 8
    6 4
    8 9
    1 5
 ...

两个流共享相同的默认线程池,但是它们生成不同的工作单元.每个外部工作单元只能在该外部工作单元的所有内部单元都完成后才能完成,因为每个并行流的末尾都有一个完成屏障.

Both streams share the same default thread pool, yet they generate different work units. Each outer work unit can only complete after all inner units for that outer work unit have completed, since there is a completion barrier at the end of each parallel stream.

如何在共享的工作线程池中管理这些内部流和外部流之间的协调,而又没有任何死锁?

How is the coordination between these inner and outer streams managed across the shared pool of worker threads, without any sort of deadlock?

推荐答案

并行流背后的线程池是公共池,您可以使用ForkJoinPool.commonPool()获得该池.它通常使用 NumberOfProcessors-1 个工作程序.为了解决您所描述的依赖关系,如果(某些)当前工作线程被阻止并且可能出现死锁,它可以动态创建其他工作线程.

The thread pool behind parallel streams is the common pool, which you can get with ForkJoinPool.commonPool(). It usually uses NumberOfProcessors - 1 workers. To resolve dependencies like you've described, it's able to dynamically create additional workers if (some) current workers are blocked and a deadlock becomes possible.

但是,这不能解决您的情况.

ForkJoinPool中的任务具有两个重要功能:

Tasks in a ForkJoinPool have two important functionalities:

  • 他们可以创建子任务并将当前任务拆分为较小的部分(叉子).
  • 他们可以等待子任务(加入).

当线程执行这样的任务 A 并加入子任务 B 时,它不仅等待阻止子任务完成执行,还执行另一个任务 C .当 C 完成时,线程返回到 A 并检查 B 是否完成.请注意, B C 可以(而且很可能是)相同的任务.如果 B 完成,则 A 已成功等待/加入了它(无阻塞!).如果前面的解释不清楚,请查看指南.

When a thread executes such a task A and joins a subtask B, it doesn't just wait blocking for the subtask to finish its execution but executes another task C in the meantime. When C is finished, the thread comes back to A and checks if B is finished. Note that B and C can (and most likely are) the same task. If B is finished, then A has successfully waited for/joined it (non-blocking!). Check out this guide if the previous explanation is not clear.

现在,当您使用并行流时,该流的范围会递归地分成多个任务,直到任务变得如此之小以至于它们可以更有效地顺序执行.这些任务被放入公共池中的工作队列(每个工作人员一个).因此,IntStream.range(0, 100).parallel().forEach所做的是递归地划分范围,直到不再值得为止.每个最终任务,或者说一堆迭代,都可以使用forEach中提供的代码顺序执行.此时,公共池中的工作程序可以执行这些任务,直到所有任务都完成并且流可以返回为止.请注意,调用线程可以通过加入子任务来帮助执行!

Now when you use a parallel stream, the range of the stream is split into tasks recursively until the tasks become so small that they can be executed sequentially more efficiently. Those tasks are put into a work queue (there is one for each worker) in the common pool. So, what IntStream.range(0, 100).parallel().forEach does is splitting up the range recursively until it's not worth it anymore. Each final task, or rather bunch of iterations, can be executed sequentially with the provided code in forEach. At this point the workers in the common pool can just execute those tasks until all are done and the stream can return. Note that the calling thread helps out with the execution by joining subtasks!

现在,在您的情况下,这些任务中的每一个都会使用并行流.程序是一样的.将其拆分为较小的任务,然后将这些任务放入公共池中的工作队列中.从ForkJoinPool的角度来看,这些只是已经存在的任务之外的其他任务.工人只是继续执行/加入任务,直到所有任务都完成并且外部流可以返回为止.

Now each of those tasks uses a parallel stream itself in your case. The procedure is the same; split it up into smaller tasks and put those tasks into a work queue in the common pool. From the ForkJoinPool's perspective those are just additional tasks on top of the already present ones. The workers just keep executing/joining tasks until all are done and the outer stream can return.

这是您在输出中看到的结果:没有确定性的行为,没有固定的顺序.同样,也不会发生死锁,因为在给定的用例中,不会阻塞线程.

This is what you see in the output: There is no deterministic behaviour, no fixed order. Also there cannot occur a deadlock because in the given use case there won't be blocking threads.

您可以使用以下代码查看说明:

You can check the explanation with the following code:

    public static void main(String[] args) {
        IntStream.range(0, 10).parallel().forEach(i -> {
            IntStream.range(0, 10).parallel().forEach(j -> {
                for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
                System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
                for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
            });
        });
    }

您应注意,内部线程的执行涉及主线程,因此不会被(!)阻塞.普通池工作人员只是一个接一个地选择任务,直到全部完成.

You should notice that the main thread is involved in the execution of the inner iterations, so it is not (!) blocked. The common pool workers just pick tasks one after another until all are finished.

这篇关于Java中的嵌套并行流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆