在 Parallel.ForEach 中嵌套 await [英] Nesting await in Parallel.ForEach

查看:38
本文介绍了在 Parallel.ForEach 中嵌套 await的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Metro 应用程序中,我需要执行多个 WCF 调用.需要进行大量调用,因此我需要在并行循环中执行它们.问题是并行循环在 WCF 调用全部完成之前就退出了.

In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.

您将如何重构它以使其按预期工作?

How would you refactor this to work as expected?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

推荐答案

Parallel.ForEach() 背后的整个想法是你有一组线程,每个线程处理集合的一部分.正如您所注意到的,这不适用于 async-await,您希望在异步调用期间释放线程.

The whole idea behind Parallel.ForEach() is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

您可以通过阻塞 ForEach() 线程来修复"这个问题,但这会破坏 async-await 的全部意义.

You could "fix" that by blocking the ForEach() threads, but that defeats the whole point of async-await.

您可以做的是使用 TPL Dataflow 而不是 Parallel.ForEach(),很好的支持异步Task.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

具体来说,您的代码可以使用 TransformBlock 编写,该代码使用 async lambda 将每个 id 转换为 Customer.该块可以配置为并行执行.您可以将该块链接到将每个 Customer 写入控制台的 ActionBlock.设置块网络后,您可以Post()每个id到TransformBlock.

Specifically, your code could be written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console. After you set up the block network, you can Post() each id to the TransformBlock.

在代码中:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

尽管您可能希望将 TransformBlock 的并行性限制为某个小常量.此外,您可以限制 TransformBlock 的容量并使用 SendAsync() 向其中异步添加项目,例如,如果集合太大.

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

与您的代码(如果它有效)相比的一个额外好处是,写入将在单个项目完成后立即开始,而不是等到所有处理完成.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

这篇关于在 Parallel.ForEach 中嵌套 await的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆