与有序输入Parallel.ForEach? [英] Parallel.ForEach with ordered input?

查看:223
本文介绍了与有序输入Parallel.ForEach?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在程序中使用的 Parallel.ForEach()语句。它使用了一些对象作为输入的列表。我不关心输出的命令,但我需要这个环带以相同的顺序输入元件在输入列表中。是否有可能与 Parallel.ForEach()实现这一点

I have a Parallel.ForEach() statement used in my program. It uses a list of some objects as input. I don't care about output order, but I need this loop to take input elements in the same order as in the input list. Is it possible to achieve this with Parallel.ForEach()?

推荐答案

在你从你的的IEnumerable&LT需要preserve秩序的情况下,T> ,你可能会想实施的 = http://msdn.microsoft.com/en-us/library/dd394988.aspx相对=nofollow> OrderablePartitioner< T> 品种。这个例子code这个类包含一个简单的例子,检索它们逐一从枚举时间递增的顺序。

In the case where you need to preserve order from your IEnumerable<T>, you'll likely want to implement a custom partitioner of the OrderablePartitioner<T> variety. The example code for this class includes a simple example which retrieves them one at a time from the enumeration in increasing order.

不过,这是很多的工作的一些东西,可能是一个简单的生产者 - 消费者模型类似的 ConcurrentQueue&LT; T&GT;

However, this is a lot of work for something which could be a simple producer-consumer model with something like ConcurrentQueue<T>:

var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Action consumer = () =>
{
    X x;
    while (queue.TryDequeue(out x))
    {
        x.Frob();
    }
};

// At most N "in flight"
int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

使用这个code,你会得到保证先入先出的行为,你的请求最终处理飞行近的顺序被接收。一旦放置在平行的,你就没有保证他们留在顺序。

Using this code you'll be guaranteed First-In-First-Out behavior, and that your requests end up processed "in flight" nearly in the order they are received. Once placed in parallel, you'll have no guarantee they stay in sequential order.

另外,您也可以使用以下(与队列项目的数量保持固定的限制):

Alternatively, you can use the following (with the restriction that the number of queue items stays fixed):

// Executes exactly queue.Count iterations at the time of Parallel.ForEach
// due to "snapshot" isolation of ConcurrentQueue<X>.GetEnumerator()
var queue = new ConcurrentQueue<X>(yourEnumerableOfX);
Parallel.ForEach(
    queue,
    _ =>
    {
        X x;
        if (queue.TryDequeue(out x))
        {
            x.Frob();
        }
    });

如果您想继续生产在一个线程中,与消费在其他国家使用的 BlockingCollection&LT; T&GT; 与队列作为其背后的集合:

If you would like to keep on producing in one thread, and consuming in others use a BlockingCollection<T> with a queue as its backing collection:

var queue = new BlockingCollection<X>(new ConcurrentQueue<X>());

// add to it
Task.Factory.StartNew( () =>
    {
         foreach (var x in yourEnumerableOfX)
         {
             queue.Add(x);
             Thread.Sleep(200);
         }

         // Signal to our consumers we're done:
         queue.CompleteAdding();
    });

现在我们需要'无界'的消费者,因为我们不能确定到底有多少队列中的项目可能是present:

Now we need 'unbounded' consumers since we are unsure exactly how many queue items may be present:

// Roughly the same consumer code as above, but 'unbounded'
Action consumer = () =>
{
    while (!queue.IsCompleted)
    {
        X x;
        try
        {
            // blocking form, switch to TryTake and maybe Thread.Sleep()
            x = queue.Take();
        }
        catch (InvalidOperationException)
        {
            // none left
            break;
        }

        x.Frob();
    }
};

int maxParallelism = Environment.ProcessorCount;
var consumers = Enumerable.Repeat(consumer, maxParallelism).ToArray();
Parallel.Invoke(consumers);

这篇关于与有序输入Parallel.ForEach?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆