在Parallell.ForEach嵌套的await [英] Nesting await in Parallell.ForEach

查看:1263
本文介绍了在Parallell.ForEach嵌套的await的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在城域应用程序,我需要执行一些WCF的调用。要发出的呼叫的数目是足够的,我需要做他们在并行循环。在awaitable WCF调用返回之前的问题是存在的并行循环。

你会

如何重构这达到预期效果?

 变种的id =新列表与所述;串GT;(){1,2,3,4,5,6,7 ,8,9,10};
VAR的客户=新System.Collections.Concurrent.BlockingCollection<客户>();Parallel.ForEach(IDS,异步I =>
{
    ICustomerRepo回购=新CustomerRepo();
    VAR卡斯特=等待repo.GetCustomer(I)
    customers.Add(卡斯特);
});的foreach(在客户VAR的客户)
{
    Console.WriteLine(customer.ID);
}Console.ReadKey();


解决方案

背后 Parallel.ForEach()整个想法是,你有一组线程,每个进程集合的一部分。当你注意到了,这不符合异步工作方式 - 等待,要释放线程的时间的异步调用。

您可以修复,通过阻断的ForEach()线程,但击败整点异步 - 的await

你可以做的就是用的 TPL数据流而不是 Parallel.ForEach(),支持异步工作一报还一。

具体来说,您的code可以用写的 TransformBlock 是将每个ID成客户使用 异步拉姆达。这个块可以被配置以并行执行。你会是块链接到 ActionBlock 写入每个客户到控制台。
当您设置的挡网,您可以发布()每个ID的 TransformBlock

在code:

  VAR IDS =新的List<串GT; {1,2,3,4,5,6,7,8,9,10};VAR getCustomerBlock =新TransformBlock<字符串,客户和GT;(
    异步I =>
    {
        ICustomerRepo回购=新CustomerRepo();
        返回等待repo.GetCustomer(ⅰ);
    },新ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
VAR writeCustomerBlock =新ActionBlock<客户>(C => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock,新DataflowLinkOptions
    {
        PropagateCompletion =真
    });的foreach(IDS中VAR ID)
    getCustomerBlock.Post(ID);getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

虽然你可能想的 TransformBlock 并行限制一些小恒。此外,您可以限制的能力 TransformBlock 和异步使用添加项目到它 SendAsync(),例如如果集合过大。

由于相比,您的code(如果它的工作),一个额外的好处就是写作将尽快单个项目完成后开始,而不要等到所有的处理完成。

In a metro app, I need to execute a number of WCF calls. The number of calls to be made are enough that I need to do them in a parallel loop. The problem is the parallel loop exists before the awaitable wcf calls return.

How would you refactor this to work as expected?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();

Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}

Console.ReadKey();

解决方案

The whole idea behind Parallel.ForEach() is that you have a set of threads and each processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.

You could "fix" that by blocking the ForEach() threads, but that defeats the whole point of async-await.

What you could do is to use TPL Dataflow instead of Parallel.ForEach(), which supports asynchronous Tasks well.

Specifically, your code could written using a TransformBlock that transforms each id into a Customer using the async lambda. This block can be configured to execute in parallel. You would link that block to an ActionBlock that writes each Customer to the console. After you set up the block network, you can Post() each id to the TransformBlock.

In code:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };

var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });

foreach (var id in ids)
    getCustomerBlock.Post(id);

getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

Although you probably want to limit the parallelism of the TransformBlock to some small constant. Also, you could limit the capacity of the TransformBlock and add the items to it asynchronously using SendAsync(), for example if the collection is too big.

As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.

这篇关于在Parallell.ForEach嵌套的await的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆