递归并发 [英] Recursive concurrency

查看:625
本文介绍了递归并发的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下伪代码函数:

Result calc(Data data) {
  if (data.isFinal()) {
    return new Result(data); // This is the actual lengthy calculation
  } else {
    List<Result> results = new ArrayList<Result>();
    for (int i=0; i<data.numOfSubTasks(); ++i) {
      results.add(calc(data.subTask(i));
    }
    return new Result(results); // merge all results in to a single result
  }
}

我想使用固定数量的线程并行化。

I want to parallelize it, using a fixed number of threads.

我的第一次尝试是:

ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);

Result calc(Data data) {
  if (data.isFinal()) {
    return new Result(data); // This is the actual lengthy calculation
  } else {
    List<Result> results = new ArrayList<Result>();
    List<Callable<Void>> callables = new ArrayList<Callable<Void>>();
    for (int i=0; i<data.numOfSubTasks(); ++i) {
      callables.add(new Callable<Void>() {
        public Void call() {
         results.add(calc(data.subTask(i));
        }
      });
    }
    executorService.invokeAll(callables);  // wait for all sub-tasks to complete
    return new Result(results); // merge all results in to a single result
  }
}

,这很快陷入了僵局,因为,顶层递归级别等待所有线程完成,内部级别也等待线程变得可用...

However, this quickly got stuck in a deadlock, because, while the top recursion level waits for all threads to finish, the inner levels also wait for threads to become available...

如何在没有死锁的情况下高效地并行化我的程序?

How can I efficiently parallelize my program without deadlocks?

推荐答案

在使用ThreadPoolExecutor 。

Your problem is a general design problem when using ThreadPoolExecutor for tasks with dependencies.

我看到两个选项:

1)确保以自下而上的顺序提交任务,

1) Make sure to submit tasks in a bottom-up order, so that you never have a running task that depends on a task which didn't start yet.

2)使用直接切换策略(请参阅 ThreadPoolExecutor 文档):

2) Use the "direct handoff" strategy (See ThreadPoolExecutor documentation):

ThreadPoolExecutor executor = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
executor.setRejectedExecutionHandler(new CallerRunsPolicy());

这个想法是使用同步队列,这样任务就不会在真正的队列中等待。拒绝处理程序负责处理没有可用线程的任务。使用此特定处理程序,提交程序线程运行被拒绝的任务。

The idea is using a synchronous queue so that tasks never wait in a real queue. The rejection handler takes care of tasks which don't have an available thread to run on. With this particular handler, the submitter thread runs the rejected tasks.

此执行程序配置确保任务不会被拒绝,并且您不会由于任务间依赖性而产生死锁。

This executor configuration guarantees that tasks are never rejected, and that you never have deadlocks due to inter-task dependencies.

这篇关于递归并发的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆