如何做Task.WhenAny时产生回报的项目 [英] How to yield return item when doing Task.WhenAny

查看:241
本文介绍了如何做Task.WhenAny时产生回报的项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的解决方案两个项目:WPF项目和类库

I have two projects in my solution: WPF project and class library.

在我的类库:

我有符号的列表:

class Symbol
{
     Identifier Identifier {get;set;}
     List<Quote> HistoricalQuotes {get;set;}
     List<Financial> HistoricalFinancials {get;set;}
}

有关每个符号,我查询金融服务检索使用的WebRequest我的符号中的每一个历史财务数据。 (webClient.DownloadStringTaskAsync(URI);)

For each symbol, I query a financial service to retrieve historical financial data for each one of my symbols using a webrequest. (webClient.DownloadStringTaskAsync(uri);)

因此​​,这里是我的方法,做到这一点:

So here's my method which do that:

    public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            // the line below doesn't compile, which is understandable because method's return type is a Task of something
            yield return new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data); 
        }
    }

    private async Task<HistoricalFinancialResult> GetFinancialsQueryAsync(Symbol symbol)
    {
        var result = new HistoricalFinancialResult();
        result.Symbol = symbol;
        result.Data = await _financialsQuery.GetFinancialsQuery(symbol.Identifier); // contains some logic like parsing and use WebClient to query asynchronously
        return result;
    }

    private class HistoricalFinancialResult
    {
        public Symbol Symbol { get; set; }
        public IEnumerable<Financial> Data { get; set; }

        // equality members
    }

正如你所看到的,我想,每次我下载一个财务历史数据每个符号,产生的结果,而不是等待我的所有呼叫金融服务来完成。

As you can see, I want that each time I download a Financial historical data per symbol, to yield the result instead of waiting for all my calls to financial service to complete.

在我的WPF中,这是我想做些什么:

And in my WPF, here's what I would like to do:

foreach(var symbol in await _service.GetSymbolsAsync())
{
      SymbolsObservableCollection.Add(symbol);
}

看来,我们不能屈服在异步方法的回报,我可以用那什么解决办法吗?除了移动我的GetSymbols方法到我的WPF项目。

It seems we can't yield return in an async method, then what solution can I use? Except moving my GetSymbols method into my WPF project.

推荐答案

虽然我喜欢的TPL数据流组件(这svick建议您使用),移动到该系统确实需要一个实质性承诺 - 这是不是你可以只添加到现有的设计。如果你正在执行CPU密集型数据处理的高产量,并希望利用多CPU内核,提供可观的效益。但是,得到最好的出它是不平凡的。

While I like the TPL Dataflow components (which svick suggests you use), moving over to that system does require a substantial commitment - it's not something you can just add to an existing design. It offers considerable benefits if you're performing high volumes of CPU-intensive data processing and want to exploit many CPU cores. But getting the best out of it is non-trivial.

他的其他建议,使用RX,可能会更容易与现有的解决方案集成。 (请参阅原始文件,但最新的code,使用的Rx-Main 的NuGet包或者,如果你想看看源,看到的在Rx codePLEX网站)它甚至有可能为调用code到使用携带的IEnumerable&LT;符号与GT; 如果你想 - 你可以用接收纯粹是一个实现细节,[修改2013年11月9日补充:]虽然身为svick指出,这可能不是一个好主意,因为你的最终目标。

His other suggestion, using Rx, might be easier to integrate with an existing solution. (See the original documentation, but for the latest code, use the Rx-Main nuget package. Or if you'd like to look at the source, see the Rx CodePlex site) It would even be possible for the calling code to carry on using an IEnumerable<Symbol> if you want - you can use Rx purely as an implementation detail, [edit 2013/11/09 to add:] although as svick has pointed out, that's probably not a good idea, given your end goal.

在我告诉你一个例​​子,我想清楚正是我们正在做的事情。你的榜样曾与该签名的方法:

Before I show you an example, I want to be clear about what exactly we're doing. Your example had a method with this signature:

public async Task<IEnumerable<Symbol>> GetSymbolsAsync()

这是返回类型,任务&LT; IEnumerable的&LT;符号&GT;&GT; ,从根本上说,这是一个生产型的一个结果的方法的IEnumerable&LT ;符号方式&gt; ,它可能不会立即产生结果

That return type, Task<IEnumerable<Symbol>>, essentially says "This is a method that produces a single result of type IEnumerable<Symbol>, and it may not produce that result immediately."

这是说的单个结果的一点,我认为是造成你悲伤,因为这不是你真正想要的东西。 A 任务&LT; T&GT; (无论是什么 T 而定)重新presents一个异步操作。它可能有很多步骤,但最终产生(的等待如果你实现它为C#异步方法很多用途)一件事。要生产多种东西,在不同的,时间,所以任务&LT; T&GT; 不是一个不错的选择。

It's that single result bit that I think is causing you grief, because that's not really what you want. A Task<T> (no matter what T may be) represents a single asynchronous operation. It may have many steps (many uses of await if you implement it as a C# async method) but ultimately it produces one thing. You want to produce multiple things, at different, times, so Task<T> is not a good fit.

如果你真的要做你的方法签名承诺 - 最终产生一个结果 - 你可以做到这一点的一种方法是让你的异步方法建立一个列表,然后产生当它准备好了结果:

If you were really going to do what your method signature promises - producing one result eventually - one way you could do this is to have your async method build a list and then produce that as the result when it's good and ready:

// Note: this first example is *not* what you want.
// However, it is what your method's signature promises to do.
public async Task<IEnumerable<Symbol>> GetSymbolsAsync()
{
    var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

    foreach (var symbol in await _listSymbols)
    {
        historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
    }

    var results = new List<Symbol>();
    while (historicalFinancialTask.Count > 0)
    {
        var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
        historicalFinancialTask.Remove(historicalFinancial);

        results.Add(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data)); 
    }

    return results;
}

这个方法做它的签名这样说:它异步产生符号序列。

This method does what its signature says: it asynchronously produces a sequence of symbols.

不过,presumably你想创建一个的IEnumerable&LT;符号&GT; 产生的项目,因为它们变得可用,而不是等到他们都可用。 (否则,你可能也只是使用 WhenAll )。你可以这样做,但收益回报率是不一样

But presumably you'd like to create an IEnumerable<Symbol> that produces the items as they become available, rather than waiting until they're all available. (Otherwise, you might as well just use WhenAll.) You can do that, but yield return is not the way.

总之,你想要做什么,我认为是产生一个异步列表。还有一种类型为:的IObservable&LT; T&GT; 前presses正是我相信你希望前preSS与任务&LT ; IEnumerable的&LT;符号&GT;&GT; :这是一个序列的项目(就像的IEnumerable&LT; T&GT; ),但异步

In short, what I think you want to do is produce an asynchronous list. There's a type for that: IObservable<T> expresses exactly what I believe you were hoping to express with your Task<IEnumerable<Symbol>>: it's a sequence of items (just like IEnumerable<T>) but asynchronous.

这可能有助于通过类比理解它:

It may help to understand it by analogy:

public Symbol GetSymbol() ...

public Task<Symbol> GetSymbolAsync() ...

public IEnumerable<Symbol> GetSymbols() ...

是:

public IObservable<Symbol> GetSymbolsObservable() ...

(不幸的是,不像任务&LT; T&GT; 没有什么叫异步面向序列方法的通用命名约定我已经添加了'可观测在这里就结束了,但是这不是普遍的做法。当然,我不会把它 GetSymbolsAsync ,因为人们会期望返回一个任务

(Unfortunately, unlike with Task<T> there isn't a common naming convention for what to call an asynchronous sequence-oriented method. I've added 'Observable' on the end here, but that's not universal practice. I certainly wouldn't call it GetSymbolsAsync because people will expect that to return a Task.)

要换个说法,任务&LT; IEnumerable的&LT; T&GT;&GT; 表示,而<$ C当我准备好了,我会产生这种集 $ C>的IObservable&LT; T&GT; 说:这是一个集合的时候,我准备好了,我会产生每一个项目。

To put it another way, Task<IEnumerable<T>> says "I'll produce this collection when I'm good and ready" whereas IObservable<T> says: "Here's a collection. I'll produce each item when I'm good and ready."

所以,你要返回符号的对象,在这些对象异步产生的序列的方法。这告诉我们,你真的应该返回一个的IObservable&LT;符号与GT; 。下面是一个实现:

So, you want a method that returns a sequence of Symbol objects, where those objects are produced asynchronously. That tells us that you should really be returning an IObservable<Symbol>. Here's an implementation:

// Unlike this first example, this *is* what you want.
public IObservable<Symbol> GetSymbolsRx()
{
    return Observable.Create<Symbol>(async obs =>
    {
        var historicalFinancialTask = new List<Task<HistoricalFinancialResult>>();

        foreach (var symbol in await _listSymbols)
        {
            historicalFinancialTask.Add(GetFinancialsQueryAsync(symbol));
        }

        while (historicalFinancialTask.Count > 0)
        {
            var historicalFinancial = await Task.WhenAny(historicalFinancialTask);
            historicalFinancialTask.Remove(historicalFinancial);

            obs.OnNext(new Symbol(historicalFinancial.Result.Symbol.Identifier, historicalFinancial.Result.Symbol.HistoricalQuotes, historicalFinancial.Result.Data));
        }
    });
}

正如你所看到的,这可以让你写pretty你希望多写什么 - 这code的身体几乎是相同的你。唯一的区别是,如果你使用收益回报率(没有编译),这个调用 OnNext 方法通过接收供给的一个对象。

As you can see, this lets you write pretty much what you were hoping to write - the body of this code is almost identical to yours. The only difference is that where you were using yield return (which didn't compile), this calls the OnNext method on an object supplied by Rx.

已经写了,你可以很容易地把这个包起来的的IEnumerable&LT;符号&GT; ([编辑2013年11月29日补充:]虽然你可能不真的想做到这一点 - 看到另外在回答结束时):

Having written that, you can easily wrap this in an IEnumerable<Symbol> ([Edited 2013/11/29 to add:] although you probably don't actually want to do this - see addition at end of answer):

public IEnumerable<Symbol> GetSymbols()
{
    return GetSymbolsRx().ToEnumerable();
}

这可能看起来不同步,但它实际上使底层code异步操作。当你调用这个方法,也不会封锁 - 即使底层code,做获取财务信息不能立即产生结果的工作,这种方法仍然会立即返回一个的IEnumerable&LT;符号&GT ; 。现在当然,任何code,试图通过集合迭代将结束,如果数据尚不可用阻塞。但关键的是做什么,我想你原先尝试实现的:

This may not look asynchronous, but it does in fact allow the underlying code to operate asynchronously. When you call this method, it will not block - even if the underlying code that does the work of fetching the financial information cannot produce a result immediately, this method will nonetheless immediately return an IEnumerable<Symbol>. Now of course, any code that attempts to iterate through that collection will end up blocking if data is not yet available. But the critical thing is that does what I think you were originally trying to achieve:


  • 您去写异步方法做的工作(在我的例子代表,作为参数传递给 Observable.Create&LT; T&GT ; ,但你可以写一个独立的异步方法,如果您preFER)

  • 调用code将不仅仅是作为阻挡问你开始获取符号的结果

  • 生成的的IEnumerable&LT;符号与GT; 将尽快产生各个项目变得可用

  • You get to write an async method that does the work (a delegate in my example, passed as an argument to Observable.Create<T> but you could write a standalone async method if you prefer)
  • The calling code will not be blocked merely as result of asking you to start fetching the symbols
  • The resulting IEnumerable<Symbol> will produce each individual item as soon as it becomes available

这工作,因为接收的 ToEnumerable 法中有一些聪明的code桥接的IEnumerable&LT的同步世界观之间的差距; T&GT ; 和异步生产的结果。 (换句话说,这不正是你失望地发现C#没能为你做的。)

This works because Rx's ToEnumerable method has some clever code in it that bridges the gap between the synchronous world view of IEnumerable<T> and asynchronous production of results. (In other words, this does exactly what you were disappointed to discover C# wasn't able to do for you.)

如果你好奇,你可以看看源。在code,它underlies什么 ToEnumerable 也可以在<一个找到href=\"https://rx.$c$cplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs\">https://rx.$c$cplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs

If you're curious, you can look at the source. The code that underlies what ToEnumerable does can be found at https://rx.codeplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GetEnumerator.cs

[编辑2013年11月29日补充:]

svick指出在评论这是我错过了:你的最终目标是将内容放入的ObservableCollection&LT;符号与GT; 。不知怎的,我没有看到一点。这意味着的IEnumerable&LT; T&GT; 是错误的路要走 - 要填充的集合作为项目变得可用,而不是一个的foreach做通循环。所以你只是这样做:

svick has pointed out in the comments something I missed: your final goal is to put the contents into an ObservableCollection<Symbol>. Somehow I didn't see that bit. That means IEnumerable<T> is the wrong way to go - you want to populate the collection as items become available, rather than doing through with a foreach loop. So you'd just do this:

GetSymbolsRx().Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

或类似的规定。因为当他们变得可用,将项添加到集合中。

or something along those lines. That will add items to the collection as and when they become available.

这要看整个事情的方式在UI线程正在拉开序幕。只要是你的异步code应该结束了在UI线程上运行,这意味着将项目添加到集合,这也发生在UI线程上。但是,如果由于某种原因,你最终会从工作线程启动的事情(或者,如果你使用 ConfigureAwait 在任何等待着,从而打破与UI线程的连接)你需要安排处理正确的线程从接收数据流中的项目:

This depends on the whole thing being kicked off on the UI thread by the way. As long as it is, your async code should end up running on the UI thread, meaning that when items are added to the collection, that also happens on the UI thread. But if for some reason you end up launching things from a worker thread (or if you were to use ConfigureAwait on any of the awaits, thus breaking the connection with the UI thread) you'd need to arrange to handle the items from the Rx stream on the right thread:

GetSymbolsRx()
    .ObserveOnDispatcher()
    .Subscribe(symbol => SymbolsObservableCollection.Add(symbol));

如果你是在UI线程当你这样做,它会拿起目前的调度,并确保所有通知,通过它到达。如果您已经在错误的线程上,当你来到订阅,您可以使用 ObserveOn 重载需要一个调度程序。 (这些都需要你有一个参考 System.Reactive.Windows.Threading 。而这些扩展方法,所以你需要一个使用其含有的命名空间,这也叫 System.Reactive.Windows.Threading

If you're on the UI thread when you do that, it'll pick up the current dispatcher, and ensure all notifications arrive through it. If you're already on the wrong thread when you come to subscribe, you can use the ObserveOn overload that takes a dispatcher. (These require you to have a reference to System.Reactive.Windows.Threading. And these are extension methods, so you'll need a using for their containing namespace, which is also called System.Reactive.Windows.Threading)

这篇关于如何做Task.WhenAny时产生回报的项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆