在Parallel.ForEach中嵌套等待 [英] Nesting await in Parallel.ForEach

查看:145
本文介绍了在Parallel.ForEach中嵌套等待的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Metro应用中,我需要执行许多WCF调用.需要进行大量调用,因此我需要在并行循环中进行调用.问题在于并行循环在WCF调用全部完成之前退出.

In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.

您如何将其重构为预期的效果?

How would you refactor this to work as expected?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

推荐答案

Parallel.ForEach()背后的整个想法是,您拥有一组线程,每个线程处理集合的一部分.如您所见,这不适用于async-await,因为您要在异步调用期间释放线程.

The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

您可以通过阻止ForEach()线程来修复"该问题,但这会破坏async-await的全部观点.

You could "fix" that by blocking the ForEach() threads, but that defeats the whole point of async-await.

您可以使用 TPL数据流代替Parallel.ForEach(),它很好地支持异步Task.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

具体来说,可以使用TransformBlock编写代码,该代码使用async lambda将每个id转换为Customer.可以将该块配置为并行执行.您可以将该块链接到一个ActionBlock,该ActionBlock将每个Customer写入控制台. 设置块网络后,可以将每个ID Post()分配到TransformBlock.

Specifically, your code could be written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console. After you set up the block network, you can Post() each id to the TransformBlock.

在代码中:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

尽管您可能希望将TransformBlock的并行性限制为一些小的常数.另外,您可以限制TransformBlock的容量,并使用SendAsync()异步向其中添加项目,例如,如果集合太大.

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

与代码(如果可行)相比,另一个好处是,写入将在单个项目完成后立即开始,而不必等到所有处理都完成了.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

这篇关于在Parallel.ForEach中嵌套等待的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆