在嵌套的 Java 8 并行流操作中使用信号量可能会死锁.这是一个错误吗? [英] Using a semaphore inside a nested Java 8 parallel stream action may DEADLOCK. Is this a bug?

查看:21
本文介绍了在嵌套的 Java 8 并行流操作中使用信号量可能会死锁.这是一个错误吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下情况:我们使用 Java 8 并行流来执行并行 forEach 循环,例如,

Consider the following situation: We are using a Java 8 parallel stream to perform a parallel forEach loop, e.g.,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

并行线程数由系统属性java.util.concurrent.ForkJoinPool.common.parallelism"控制,通常等于处理器数.

The number of parallel threads is controlled by the system property "java.util.concurrent.ForkJoinPool.common.parallelism" and usually equal to the number of processors.

现在假设我们想限制特定工作的并行执行次数 - 例如因为那部分是内存密集型的,而内存限制意味着并行执行的限制.

Now assume that we like to limit the number of parallel executions for a specific piece of work - e.g. because that part is memory intensive and memory constrain imply a limit of parallel executions.

限制并行执行的一种明显而优雅的方法是使用信号量(建议在此处),例如,以下图片代码将并行执行的次数限制为 5:

An obvious and elegant way to limit parallel executions is to use a Semaphore (suggested here), e.g., the following pice of code limits the number of parallel executions to 5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });

这很好用!

但是:在工作程序内部使用任何其他并行流(位于 /* WORK DONE HERE */)可能会导致 死锁.

However: Using any other parallel stream inside the worker (at /* WORK DONE HERE */) may result in a deadlock.

对我来说这是一个意想不到的行为.

For me this is an unexpected behavior.

说明:由于 Java 流使用 ForkJoin 池,因此内部 forEach 正在分叉,并且连接似乎永远在等待.但是,这种行为仍然出乎意料.请注意,如果您将 "java.util.concurrent.ForkJoinPool.common.parallelism" 设置为 1,并行流甚至可以工作.

Explanation: Since Java streams use a ForkJoin pool, the inner forEach is forking, and the join appears to be waiting for ever. However, this behavior is still unexpected. Note that parallel streams even work if you set "java.util.concurrent.ForkJoinPool.common.parallelism" to 1.

还要注意,如果有一个内部平行的 forEach,它可能不透明.

Note also that it may not be transparent if there is an inner parallel forEach.

问题: 这种行为是否符合 Java 8 规范(在这种情况下,这意味着禁止在并行流工作线程中使用信号量)还是这是一个错误?

为方便起见:下面是一个完整的测试用例.两个布尔值的任何组合都可以工作,除了true, true",这会导致死锁.

For convenience: Below is a complete test case. Any combinations of the two booleans work, except "true, true", which results in the deadlock.

澄清: 为了说明这一点,让我强调一个方面:死锁不会发生在信号量的acquire.请注意,代码包括

Clarification: To make the point clear, let me stress one aspect: The deadlock does not occur at the acquire of the semaphore. Note that the code consists of

  1. 获取信号量
  2. 运行一些代码
  3. 释放信号量

并且死锁发生在 2. 如果那段代码正在使用另一个并行流.然后死锁发生在 OTHER 流中.因此,似乎不允许同时使用嵌套并行流和阻塞操作(如信号量)!

and the deadlock occurs at 2. if that piece of code is using ANOTHER parallel stream. Then the deadlock occurs inside that OTHER stream. As a consequence it appears that it is not allowed to use nested parallel streams and blocking operations (like a semaphore) together!

请注意,据记载,并行流使用 ForkJoinPool 并且 ForkJoinPool 和信号量属于同一个包 - java.util.concurrent(因此人们希望它们能够很好地互操作).

Note that it is documented that parallel streams use a ForkJoinPool and that ForkJoinPool and Semaphore belong to the same package - java.util.concurrent (so one would expect that they interoperate nicely).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de.
 *
 * Created on 03.05.2014
 */
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
 * This is a test of Java 8 parallel streams.
 * 
 * The idea behind this code is that the Semaphore concurrentExecutions
 * should limit the parallel executions of the outer forEach (which is an
 * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
 * the parallel executions of the outer forEach should be limited due to a
 * memory constrain).
 * 
 * Inside the execution block of the outer forEach we use another parallel stream
 * to create an inner forEach. The number of concurrent
 * executions of the inner forEach is not limited by us (it is however limited by a
 * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
 * 
 * Problem: If the semaphore is used AND the inner forEach is active, then
 * the execution will be DEADLOCKED.
 * 
 * Note: A practical application is the implementation of the parallel
 * LevenbergMarquardt optimizer in
 * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
 * In one application the number of tasks in the outer and inner loop is very large (>1000)
 * and due to memory limitation the outer loop should be limited to a small (5) number
 * of concurrent executions.
 * 
 * @author Christian Fries
 */
public class ForkJoinPoolTest {

    public static void main(String[] args) {

        // Any combination of the booleans works, except (true,true)
        final boolean isUseSemaphore    = true;
        final boolean isUseInnerStream  = true;

        final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).
        final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).
        final int       concurrentExecusionsLimitInOuterLoop = 5;
        final int       concurrentExecutionsLimitForStreams = 10;

        final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

            if(isUseSemaphore) {
                concurrentExecutions.acquireUninterruptibly();
            }

            try {
                System.out.println(i + "	" + concurrentExecutions.availablePermits() + "	" + Thread.currentThread());

                if(isUseInnerStream) {
                    runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
                }
                else {
                    try {
                        Thread.sleep(10*numberOfTasksInInnerLoop);
                    } catch (Exception e) {
                    }
                }
            }
            finally {
                if(isUseSemaphore) {
                    concurrentExecutions.release();
                }
            }
        });

        System.out.println("D O N E");
    }

    /**
     * Runs code in a parallel forEach using streams.
     * 
     * @param numberOfTasksInInnerLoop Number of tasks to execute.
     */
    private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
        IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        });
    }
}

推荐答案

每当您将问题分解为任务时,这些任务可能会被其他任务阻塞,并尝试在有限线程池中执行它们,您就是面临池引起的死锁的风险.请参阅Java 并发实践 8.1.

Any time you are decomposing a problem into tasks, where those tasks could be blocked on other tasks, and try and execute them in a finite thread pool, you are at risk for pool-induced deadlock. See Java Concurrency in Practice 8.1.

这无疑是一个错误——在您的代码中.您正在用将阻塞等待同一池中其他任务结果的任务填满 FJ 池.有时你很幸运,事情不会死锁(就像不是所有的锁顺序错误都会导致死锁),但从根本上说,你在这里滑冰.

This is unquestionably a bug -- in your code. You're filling up the FJ pool with tasks that are going to block waiting for the results of other tasks in the same pool. Sometimes you get lucky and things manage to not deadlock (just like not all lock-ordering errors result in deadlock all the time), but fundamentally you're skating on some very thin ice here.

这篇关于在嵌套的 Java 8 并行流操作中使用信号量可能会死锁.这是一个错误吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆