需要帮助在D中并行遍历dag [英] Need help parallel traversing a dag in D

查看:125
本文介绍了需要帮助在D中并行遍历dag的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

好的,这是一个简单得多的示例,说明了我的问题.为什么只有第一个任务排在队列中?

OK, here's a much much simpler example illustrating my problem. why is only the first task ever put onto the queue?

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        pool.put( task!simpleWorker(depth,maxDepth,pool));
    }
}

void main(){
    auto pool = new TaskPool();
    pool.put(task!simpleWorker(0,5,pool));
    pool.finish(true);
    writeln("Done");
}

原文:

我需要遍历此DAG.当我访问一个节点时,我会对其进行清理.在节点的所有父节点都干净之前,我无法对其进行清理.

I need to traverse this DAG. When I visit a node I clean it. I can't clean a node until all of its parents are clean.

我尝试的方法是让工作线程的当前节点检查其所有子节点,以查看可以处理的子节点.任何可以处理的都将添加到TaskPool中.

The way I'm attempting is to have the worker thread's current node check all of its children to see which ones can be processed. Any that can be processed are added to the TaskPool.

我的问题是我不知道如何将新任务添加到TaskPool并进行处理.这只是清理DAG中的第一个节点,然后退出,使其他所有内容变脏.

My problem is I can't figure out how to add new tasks to the TaskPool and get them processed. This just cleans the first node in the DAG, and then exits, leaving everything else dirty.

void cleanNode(Node node, TaskPool pool){
    node.doProcess();
    foreach (client; node.clients){
        if (client.canProcess()){
            pool.put(task!cleanNode(client, pool));
        }
    }
}

void main(){
    auto dag = mkTestDag(5);
    auto pool = new TaskPool();

    pool.put( task!cleanNode(dag[0], pool));
    pool.finish(true); 

    writeln("\n\nOutput:");
    foreach (d;dag){
        writeln(d);
        writeln(d.dirty ? "dirty" : "clean","\n");
    }
}

完整代码在这里: http://pastebin.com/LLfMyKVp

推荐答案

这是因为从simpleWorker中的放置引发了错误.

It is because an Error is being thrown from the put inside simpleWorker.

此版本显示错误:

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        try {
            pool.put( task!simpleWorker(depth,maxDepth,pool));
        } catch (Error e) {
            writeln("Fail: ", e.msg);
        }
    }
}

void main(){
    auto pool = new TaskPool();
    pool.put(task!simpleWorker(0,5,pool));
    pool.finish(true);
    writeln("Done");
}

输出:

Depth is: 0
Fail: Cannot submit a new task to a pool after calling finish() or stop().
Done

希望其他人可以解释使用TaskPool的正确方法.

Hopefully someone else can explain the correct way to use TaskPool.

修改

通过告诉任务像这样运行来使其正常工作:

Got it working by telling the tasks to run like this:

import std.stdio;
import std.parallelism;

void simpleWorker(uint depth, uint maxDepth, TaskPool pool){
    writeln("Depth is: ",depth);
    if (++depth < maxDepth){
        try 
        {
            auto subWorker = task!simpleWorker(depth,maxDepth, pool);
            pool.put(subWorker);
            subWorker.yieldForce();
        } catch (Error t) {
            writeln("Fail: (",  typeof(t).stringof, ") ", t.msg);
        }
    }
}

void main(){
    auto pool = new TaskPool();

    auto worker = task!simpleWorker(0,5, pool);
    pool.put(worker);
    worker.yieldForce();

    pool.finish(true);
    writeln("Done");
}

输出:

Depth is: 0
Depth is: 1
Depth is: 2
Depth is: 3
Depth is: 4
Done

这篇关于需要帮助在D中并行遍历dag的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆