任务结合结果并继续 [英] Tasks combine result and continue

查看:55
本文介绍了任务结合结果并继续的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 16 个任务在做同样的工作,每个任务都返回一个数组.我想将结果成对组合并做同样的工作,直到我只有一项任务.我不知道这样做的最佳方法是什么.

I have 16 tasks doing the same job, each of them return an array. I want to combine the results in pairs and do same job until I have only one task. I don't know what is the best way to do this.

public static IComparatorNetwork[] Prune(IComparatorNetwork[] nets, int numTasks)
    {
        var tasks = new Task[numTasks];
        var netsPerTask = nets.Length/numTasks;
        var start = 0;
        var concurrentSet = new ConcurrentBag<IComparatorNetwork>();
        
        for(var i = 0; i  < numTasks; i++)
        {
            IComparatorNetwork[] taskNets;
            if (i == numTasks - 1)
            {
                taskNets = nets.Skip(start).ToArray();                 
            }
            else
            {
                taskNets = nets.Skip(start).Take(netsPerTask).ToArray();
            }

            start += netsPerTask;
            tasks[i] = Task.Factory.StartNew(() =>
            {
                var pruner = new Pruner();
                concurrentSet.AddRange(pruner.Prune(taskNets));
            });
        }

        Task.WaitAll(tasks.ToArray());

        if(numTasks > 1)
        {
            return Prune(concurrentSet.ToArray(), numTasks/2);
        }

        return concurrentSet.ToArray();
    }

现在我正在等待所有任务完成,然后我重复一半的任务,直到我只有一个.我不想在每次迭代中都等待.我对并行编程很陌生,可能这种方法很糟糕.我试图并行化的代码如下:

Right now I am waiting for all tasks to complete then I repeat with half of the tasks until I have only one. I would like to not have to wait for all on each iteration. I am very new with parallel programming probably the approach is bad. The code I am trying to parallelize is the following:

public IComparatorNetwork[] Prune(IComparatorNetwork[] nets)
    {
        var result = new List<IComparatorNetwork>();

        for (var i = 0; i < nets.Length; i++) 
        {
            var isSubsumed = false;

            for (var index = result.Count - 1; index >= 0; index--)
            {
                var n = result[index];

                if (nets[i].IsSubsumed(n))
                {
                    isSubsumed = true;
                    break;
                }

                if (n.IsSubsumed(nets[i]))
                {
                    result.Remove(n);
                }
            }

            if (!isSubsumed) 
            {
                result.Add(nets[i]);
            }
        }

        return result.ToArray();
    }`

推荐答案

因此,您在这里所做的基本上是聚合值,但同时进行.幸运的是,PLINQ 已经有一个并行工作的聚合实现.因此,在您的情况下,您可以简单地将原始数组中的每个元素包装在其自己的一个元素数组中,然后您的 Prune 操作能够将任意两个网络数组组合成一个新的单个数组.>

So what you're fundamentally doing here is aggregating values, but in parallel. Fortunately, PLINQ already has an implementation of Aggregate that works in parallel. So in your case you can simply wrap each element in the original array in its own one element array, and then your Prune operation is able to combine any two arrays of nets into a new single array.

public static IComparatorNetwork[] Prune(IComparatorNetwork[] nets)
{
    return nets.Select(net => new[] { net })
        .AsParallel()
        .Aggregate((a, b) => new Pruner().Prune(a.Concat(b).ToArray()));
}

我对他们的聚合方法的内部结构不是很了解,但我认为它可能相当不错并且不会花费大量时间进行不必要的等待.但是,如果你想自己编写,这样你就可以确保工作人员总是在他们的新工作出现时立即引入新工作,这是我自己的实现.随意在您的特定情况下比较两者,看看哪个最适合您的需求.请注意,PLINQ 可以通过多种方式进行配置,您可以随意尝试其他配置,看看哪种配置最适合您的情况.

I'm not super knowledgeable about the internals of their aggregate method, but I would imagine it's likely pretty good and doesn't spend a lot of time waiting unnecessarily. But, if you want to write your own, so that you can be sure the workers are always pulling in new work as soon as their is new work, here is my own implementation. Feel free to compare the two in your specific situation to see which performs best for your needs. Note that PLINQ is configurable in many ways, feel free to experiment with other configurations to see what works best for your situation.

public static T AggregateInParallel<T>(this IEnumerable<T> values, Func<T, T, T> function, int numTasks)
{
    Queue<T> queue = new Queue<T>();
    foreach (var value in values)
        queue.Enqueue(value);
    if (!queue.Any())
        return default(T);  //Consider throwing or doing something else here if the sequence is empty

    (T, T)? GetFromQueue()
    {
        lock (queue)
        {
            if (queue.Count >= 2)
            {
                return (queue.Dequeue(), queue.Dequeue());
            }
            else
            {
                return null;
            }
        }
    }

    var tasks = Enumerable.Range(0, numTasks)
        .Select(_ => Task.Run(() =>
        {
            var pair = GetFromQueue();
            while (pair != null)
            {
                var result = function(pair.Value.Item1, pair.Value.Item2);
                lock (queue)
                {
                    queue.Enqueue(result);
                }
                pair = GetFromQueue();
            }
        }))
        .ToArray();
    Task.WaitAll(tasks);
    return queue.Dequeue();
}

此版本的调用代码如下所示:

And the calling code for this version would look like:

public static IComparatorNetwork[] Prune2(IComparatorNetwork[] nets)
{
    return nets.Select(net => new[] { net })
        .AggregateInParallel((a, b) => new Pruner().Prune(a.Concat(b).ToArray()), nets.Length / 2);
}

正如评论中提到的,你可以让修剪器的 Prune 方法更高效,让它接受两个集合,而不仅仅是一个集合,并且只比较每个集合中的项目,知道所有来自同一集合的项目不会包含该集合中的任何其他项目.这使得该方法不仅更短、更简单、更容易理解,而且还消除了相当一部分昂贵的比较.一些小的调整也可以大大减少创建的中间集合的数量.

As mentioned in comments, you can make the pruner's Prune method much more efficient by having it accept two collections, not just one, and only comparing items from each collection with the other, knowing that all items from the same collection will not subsume any others from that collection. This makes the method not only much shorter, simpler, and easier to understand, but also removes a sizeable portion of the expensive comparisons. A few minor adaptations can also greatly reduce the number of intermediate collections created.

public static IReadOnlyList<IComparatorNetwork> Prune(IReadOnlyList<IComparatorNetwork> first, IReadOnlyList<IComparatorNetwork> second)
{
    var firstItemsNotSubsumed = first.Where(outerNet => !second.Any(innerNet => outerNet.IsSubsumed(innerNet)));
    var secondItemsNotSubsumed = second.Where(outerNet => !first.Any(innerNet => outerNet.IsSubsumed(innerNet)));
    return firstItemsNotSubsumed.Concat(secondItemsNotSubsumed).ToList();
}

调用代码只需要稍作修改以确保类型匹配,并且您传入两个集合而不是先连接它们.

With the the calling code just needs minor adaptations to ensure the types match up and that you pass in both collections rather than concatting them first.

public static IReadOnlyList<IComparatorNetwork> Prune(IReadOnlyList<IComparatorNetwork> nets)
{
    return nets.Select(net => (IReadOnlyList<IComparatorNetwork>)new[] { net })
        .AggregateInParallel((a, b) => Pruner.Prune(a, b), nets.Count / 2);
}

这篇关于任务结合结果并继续的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆