任务的有向无环图的并行执行 [英] Parallel execution of directed acyclic graph of tasks

查看:28
本文介绍了任务的有向无环图的并行执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个任务列表[Task-A,Task-B,Task-C,Task-D, ...].
一项任务可以选择性地依赖于其他任务.

例如:
A 可以依赖 3 个任务:B、C 和 D
B 可以依赖于 2 个任务:C 和 E

它基本上是一个有向无环图,只有在执行依赖任务之后才能执行任务.

现在可能会发生这样的情况,即在任何时间点都有多个任务准备好执行.在这种情况下,我们可以并行运行它们.

关于如何在尽可能多的并行性的同时实现这样的执行有什么想法吗?

class Task{私人字符串名称;私人列表<任务>依赖任务;公共无效运行(){//商业逻辑}}

解决方案

I have a list of tasks [Task-A,Task-B,Task-C,Task-D, ...].
One task can be optionally dependent on other tasks.

For example:
A can be dependent on 3 tasks: B, C and D
B can be dependent on 2 tasks: C and E

It's basically a directed acyclic graph and execution of a task should happen only after the dependent tasks are executed.

Now it might happen that at any point of time, there are multiple tasks that are ready for execution. In such a case, we can run them in parallel.

Any idea on how to implement such an execution while having as much parallelism as possible?

class Task{
     private String name;
     private List<Task> dependentTasks;
     
     public void run(){
     // business logic
     }
}

解决方案

The other answer works fine but is too complicated.

A simpler way is to just execute Kahn's algorithm but in parallel.

The key is to execute all the tasks in parallel for whom all dependencies have been executed.

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;


class DependencyManager {
private final ConcurrentHashMap<String, List<String>> _dependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<String>> _reverseDependencies = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Runnable> _tasks = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Integer> _numDependenciesExecuted = new ConcurrentHashMap<>();
private final  AtomicInteger _numTasksExecuted = new AtomicInteger(0);
private final ExecutorService _executorService = Executors.newFixedThreadPool(16);

private static Runnable getRunnable(DependencyManager dependencyManager, String taskId){
    return () -> {
    try {
        Thread.sleep(2000);  // A task takes 2 seconds to finish.
        dependencyManager.taskCompleted(taskId);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    };
}

/**
* In case a vertex is disconnected from the rest of the graph.
* @param taskId The task id
*/
public void addVertex(String taskId) {
    _dependencies.putIfAbsent(taskId, new ArrayList<>());
    _reverseDependencies.putIfAbsent(taskId, new ArrayList<>());
    _tasks.putIfAbsent(taskId, getRunnable(this, taskId));
    _numDependenciesExecuted.putIfAbsent(taskId, 0);
}

private void addEdge(String dependentTaskId, String dependeeTaskId) {
    _dependencies.get(dependentTaskId).add(dependeeTaskId);
    _reverseDependencies.get(dependeeTaskId).add(dependentTaskId);
}

public void addDependency(String dependentTaskId, String dependeeTaskId) {
    addVertex(dependentTaskId);
    addVertex(dependeeTaskId);
    addEdge(dependentTaskId, dependeeTaskId);
}

private void taskCompleted(String taskId) {
    System.out.println(String.format("%s:: Task %s done!!", Instant.now(), taskId));
    _numTasksExecuted.incrementAndGet();
    _reverseDependencies.get(taskId).forEach(nextTaskId -> {
        _numDependenciesExecuted.computeIfPresent(nextTaskId, (__, currValue) -> currValue + 1);
        int numDependencies = _dependencies.get(nextTaskId).size();
        int numDependenciesExecuted = _numDependenciesExecuted.get(nextTaskId);
        if (numDependenciesExecuted == numDependencies) {
        // All dependencies have been executed, so we can submit this task to the threadpool. 
            _executorService.submit(_tasks.get(nextTaskId));
        }
        });
    if (_numTasksExecuted.get() == _tasks.size()) {
        topoSortCompleted();
    }
}

private void topoSortCompleted() {
    System.out.println("Topo sort complete!!");
    _executorService.shutdownNow();
}

public void executeTopoSort() {
    System.out.println(String.format("%s:: Topo sort started!!", Instant.now()));
    _dependencies.forEach((taskId, dependencies) -> {
    if (dependencies.isEmpty()) {
        _executorService.submit(_tasks.get(taskId));
    }
    });
}
}

public class TestParallelTopoSort {

public static void main(String[] args) {
    DependencyManager dependencyManager = new DependencyManager();
    dependencyManager.addDependency("8", "5");
    dependencyManager.addDependency("7", "5");
    dependencyManager.addDependency("7", "6");
    dependencyManager.addDependency("6", "3");
    dependencyManager.addDependency("6", "4");
    dependencyManager.addDependency("5", "1");
    dependencyManager.addDependency("5", "2");
    dependencyManager.addDependency("5", "3");
    dependencyManager.addDependency("4", "1");
    dependencyManager.executeTopoSort();
    // Parallel version takes 8 seconds to execute.
    // Serial version would have taken 16 seconds.

}
}

The Directed Acyclic Graph constructed in this example is this:

这篇关于任务的有向无环图的并行执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆