我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗? [英] Can I use the work-stealing behaviour of ForkJoinPool to avoid a thread starvation deadlock?

查看:3146
本文介绍了我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果池中的所有线程都在等待同一个池中的排队任务完成,则在正常线程池中会出现线程饥饿死锁。 ForkJoinPool 通过从 join()调用内部的其他线程中窃取工作来避免此问题,而不是简单地等待。例如:

  private static class ForkableTask extends RecursiveTask< Integer> {
private final CyclicBarrier barrier;

ForkableTask(CyclicBarrier barrier){
this.barrier = barrier;
}

@Override
protected Integer compute(){
try {
barrier.await();
return 1;
} catch(InterruptedException | BrokenBarrierException e){
throw new RuntimeException(e);
}
}
}

@Test
public void testForkJoinPool()throws Exception {
final int parallelism = 4;
final ForkJoinPool pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);

final List< ForkableTask> forkableTasks = new ArrayList<>(parallelism);
for(int i = 0; i forkableTasks.add(new ForkableTask(barrier));
}

int result = pool.invoke(new RecursiveTask< Integer>(){
@Override
protected Integer compute(){
for ForkableTask任务:forkableTasks){
task.fork();
}

int result = 0;
for(ForkableTask task:forkableTasks){
result + = task.join();
}
return result;
}
});
assertThat(result,equalTo(parallelism));
}

但是当使用 ExecutorService 接口到 ForkJoinPool ,似乎没有发生工作窃取。例如:

  private static class CallableTask implements Callable< Integer> {
private final CyclicBarrier barrier;

CallableTask(CyclicBarrier barrier){
this.barrier = barrier;
}

@Override
public Integer call()throws Exception {
barrier.await();
return 1;
}
}

@Test
public void testWorkStealing()throws Exception {
final int parallelism = 4;
final ExecutorService pool = new ForkJoinPool(parallelism);
final CyclicBarrier barrier = new CyclicBarrier(parallelism);

final List< CallableTask> callableTasks = Collections.nCopies(parallelism,new CallableTask(barrier));
int result = pool.submit(new Callable< Integer>(){
@Override
public Integer call()throws Exception {
int result = 0;
// invlockAll()中的死锁,而不是偷工作
for(Future< Integer> future:pool.invokeAll(callableTasks)){
result + = future.get();
}
return result;
}
})。get();
assertThat(result,equalTo(parallelism));
}

从粗略的看看 ForkJoinPool 的实现,所有常规的 ExecutorService API是使用 ForkJoinTask 实现的,所以我不确定为什么会发生死锁。

解决方案

你几乎回答自己的问题。解决方案是 ForkJoinPool 通过从 join()调用中偷走其他线程的工作来避免此问题。每当线程因为除了 ForkJoinPool.join()之外的其他原因而被阻塞时,这个工作不会发生,线程只等待并且什么也不做。

$这样做的原因是,在Java中, ForkJoinPool 不可能阻止其线程阻塞,而是给它们一些东西工作。线程本身需要避免阻塞,而是要求池执行它应该做的工作。这只是在 ForkJoinTask.join()方法中实现,而不是在任何其他阻塞方法中。如果你在 ForkJoinPool 里使用 Future ,你也会看到饥饿死锁。



为什么工作窃取仅在 ForkJoinTask.join()中实现,而不是在Java API中的任何其他阻止方法中实现?那么,有很多这样的阻塞方法( Object.wait() Future.get() java.util.concurrent ,I / O方法等)中的并发原语,它们与 ForkJoinPool ,这只是API中的一个任意类,所以添加特殊情况到所有这些方法将是坏的设计。这也将导致可能非常令人惊讶和不期望的效果。想象一下,例如,用户将任务传递到等待 Future ExecutorService ,然后发现任务只是因为运行的线程偷了一些其他(长时间运行的)工作项,而不是等待 Future / code>并在结果可用后立即继续。一旦线程开始处理另一个任务,它不能返回到原始任务,直到第二个任务完成。因此,其他阻塞方法不会做偷工作实际上是一件好事。对于 ForkJoinTask ,这个问题不存在,因为主任务尽快继续并不重要,所有的任务一起处理是重要的



这也不可能实现你自己的方法在一个 ForkJoinPool 因为所有相关的部分都不是公开的。



然而,实际上还有第二种方法可以防止饥饿死锁。这称为受阻塞。它不使用工作窃取(以避免上面提到的问题),但也需要将被阻塞的线程积极地与线程池合作。使用托管阻塞,线程告诉线程池它可能在其调用潜在阻塞方法之前被阻止,并且还在阻塞方法完成时通知池。线程池然后知道存在饥饿死锁的风险,并且如果其所有线程当前处于某些阻塞操作中并且还有其他任务要执行,则可以产生附加线程。注意,这比工作窃取效率低,因为额外线程的开销。如果你实现一个递归并行算法与普通的futures和托管阻塞,而不是 ForkJoinTask 和工作窃取,额外的线程数可能会非常大(因为在阶段的算法,很多任务将被创建并给予线程,立即阻止和等待子任务的结果)。然而,饥饿僵局仍然被阻止,并且它避免了任务必须等待很长时间的问题,因为它的线程同时开始在另一个任务上工作。



Java的 ForkJoinPool 也支持托管阻塞。要使用此功能,您需要实施 ForkJoinPool.ManagedBlocker ,以便从方法中调用任务要执行的潜在阻塞方法这个接口。然后任务可能不直接调用阻塞方法,而是需要调用静态方法 ForkJoinPool.managedBlock(ManagedBlocker) 。此方法处理与阻塞之前和之后的线程池的通信。如果当前任务没有在 ForkJoinPool 中执行,那么它也可以工作,那么它只是调用阻塞方法。



我在Java API(对于Java 7)中发现的唯一一个实际使用托管阻塞的地方是类 Phaser 。 (这个类是同步屏障,比如互斥锁和锁存器,但是更灵活和强大。)因此与 Phaser 同步 ForkJoinPool 任务应该使用托管阻塞,并且可以避免饥饿死锁(但 ForkJoinTask.join()是更可取的,因为它使用工作窃取而不是托管阻塞)。无论您是否直接使用 ForkJoinPool ,或通过其 ExecutorService 界面,这都会起作用。但是,如果您使用任何其他 ExecutorService 像类 Executors 创建的那些,因为这些不会支持托管阻止。



在Scala中,托管阻止的使用更广泛(说明 API )。


A thread starvation deadlock occurs in a normal thread pool if all the threads in the pool are waiting for queued tasks in the same pool to complete. ForkJoinPool avoids this problem by stealing work from other threads from inside the join() call, rather than simply waiting. For example:

private static class ForkableTask extends RecursiveTask<Integer> {
    private final CyclicBarrier barrier;

    ForkableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    protected Integer compute() {
        try {
            barrier.await();
            return 1;
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

@Test
public void testForkJoinPool() throws Exception {
    final int parallelism = 4;
    final ForkJoinPool pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
    for (int i = 0; i < parallelism; ++i) {
        forkableTasks.add(new ForkableTask(barrier));
    }

    int result = pool.invoke(new RecursiveTask<Integer>() {
        @Override
        protected Integer compute() {
            for (ForkableTask task : forkableTasks) {
                task.fork();
            }

            int result = 0;
            for (ForkableTask task : forkableTasks) {
                result += task.join();
            }
            return result;
        }
    });
    assertThat(result, equalTo(parallelism));
}

But when using the ExecutorService interface to a ForkJoinPool, work-stealing doesn't seem to occur. For example:

private static class CallableTask implements Callable<Integer> {
    private final CyclicBarrier barrier;

    CallableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Integer call() throws Exception {
        barrier.await();
        return 1;
    }
}

@Test
public void testWorkStealing() throws Exception {
    final int parallelism = 4;
    final ExecutorService pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
    int result = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int result = 0;
            // Deadlock in invokeAll(), rather than stealing work
            for (Future<Integer> future : pool.invokeAll(callableTasks)) {
                result += future.get();
            }
            return result;
        }
    }).get();
    assertThat(result, equalTo(parallelism));
}

From a cursory look at ForkJoinPool's implementation, all the regular ExecutorService APIs are implemented using ForkJoinTasks, so I'm not sure why a deadlock occurs.

解决方案

You are almost answering your own question. The solution is the statement that "ForkJoinPool avoids this problem by stealing work from other threads from inside the join() call". Whenever the threads are blocked for some other reason except ForkJoinPool.join(), this work stealing does not occur, and the threads just waits and does nothing.

The reason for this is that in Java it is not possible for the ForkJoinPool to prevent its threads from blocking and instead give them something else to work on. The thread itself needs to avoid blocking and instead ask the pool for work it should do. And this is only implemented in the ForkJoinTask.join() method, not in any other blocking method. If you use a Future inside a ForkJoinPool, you will also see the starvation deadlock.

Why is work stealing only implemented in ForkJoinTask.join() and not in any other blocking methods in the Java API? Well, there are many of such blocking methods (Object.wait(), Future.get(), any of the concurrency primitives in java.util.concurrent, I/O methods etc), and they have nothing to do with ForkJoinPool, which is just an arbitrary class in the API, so adding special cases to all these methods would be bad design. It would also lead to possibly very surprising and undesired effects. Imagine for example a user passing a task to an ExecutorService that waits on a Future, and then finding out that the task hangs very long in Future.get() just because the running thread stole some other (long-running) work item instead of waiting for the Future and continuing immediately after the result is available. Once a thread starts working on another task, it cannot return to the original task until the second task is finished. Thus it is actually a good thing that other blocking methods do not do work stealing. For a ForkJoinTask, this problem does not exist, because it is not important that the primary task is continued as soon as possible, it is only important that all tasks together are handled as efficiently as possible.

It is also not possible to implement your own method for doing work stealing inside a ForkJoinPool, because all the relevant parts are not public.

However, there is actually a second method how starvation deadlocks can be prevented. This is called managed blocking. It does not use work stealing (to avoid the problem mentioned above), but also needs the thread that is going to be block to actively cooperate with the thread pool. With managed blocking, the thread tells the thread pool that it may be blocked before it calls the potentially blocking method, and also informs the pool when the blocking method is finished. The thread pool then knows that there is a risk of a starvation deadlock, and may spawn additional threads if all of its threads are currently in some blocking operation and there are still other tasks to execute. Note that this is less efficient than work stealing, because of the overhead of the additional threads. If you implement a recursive parallel algorithm with ordinary futures and managed blocking instead of with ForkJoinTask and work stealing, the number of additional threads can get very large (because in the "divide" phase of the algorithm, a lot of tasks will be created and given to threads that immediately block and wait for results from sub-tasks). However, a starvation deadlock is still prevented, and it avoids the problem that a task has to wait a long time because its thread started working on another task in the mean time.

The ForkJoinPool of Java also supports managed blocking. To use this, one needs to implement the interface ForkJoinPool.ManagedBlocker such that the potentially-blocking method that the task wants to execute is called from within the block method of this interface. Then the task may not call the blocking method directly, but instead needs to call the static method ForkJoinPool.managedBlock(ManagedBlocker). This method handles the communication with the thread pool before and after the blocking. It also works if the current task is not executed within a ForkJoinPool, then it just calls the blocking method.

The only place I have found in the Java API (for Java 7) that actually uses managed blocking is the class Phaser. (This class is a synchronization barrier like mutexes and latches, but more flexible and powerful.) So synchronizing with a Phaser inside a ForkJoinPool task should use managed blocking and can avoid starvation deadlocks (but ForkJoinTask.join() is still preferable because it uses work stealing instead of managed blocking). This works regardless of whether you use the ForkJoinPool directly or via its ExecutorService interface. However, it will not work if you use any other ExecutorService like those created by the class Executors, because these do not support managed blocking.

In Scala, the use of managed blocking is more widespread (description, API).

这篇关于我可以使用ForkJoinPool的工作窃取行为来避免线程饥饿死锁吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆