转换一个IEnumerable的&LT; T&GT;到的IObservable&LT;吨&gt;中具有最大并行 [英] Converting a IEnumerable<T> to IObservable<T>, with maximum parallelism
问题描述
我有异步任务的指令序列(例如,取N个Web页)。现在我要的是他们都公开为的IObservable&LT; T&GT;
。我目前的解决方案采用从答案<一个href=\"http://stackoverflow.com/questions/13500456/how-to-convert-an-ienumerabletaskt-to-iobservablet\">this问题:
I have a sequence of async tasks to do (say, fetch N web pages). Now what I want is to expose them all as an IObservable<T>
. My current solution uses the answer from this question:
async Task<ResultObj> GetPage(string page) {
Console.WriteLine("Before");
var result = await FetchFromInternet(page);
Console.WriteLine("After");
return result;
}
// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
Select(t => Observable.FromAsync(() => t)).Merge();
// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
Console.WriteLine(obj.ToString());
}
的问题是不知道的页数要获取,并且它可能是大的。我并不想使数百并发请求。因此,我希望的方式来限制将并行执行的任务的最大数量。有没有办法来限制 GETPAGE
?
有一个合并
重载需要一个maxConcurrent参数,但它似乎并没有真正限制功能invokation的并发性。控制台打印所有之前的消息前,后的消息。
There is a Merge
overload that takes a maxConcurrent parameter, but it does not seem to actually limit the concurrency of the function invokation. THe console prints all the Before messages before the After messages.
请注意:我需要回到的IEnumerable&LT转换; T&GT;
。我正在写,让我的数据获取的描述系统的数据源,我需要给它回下载的数据的列表。
Note: I need to convert back to IEnumerable<T>
. I'm writing a data source for a system that gives me descriptors of data to fetch, and I need to give it back a list of the downloaded data.
推荐答案
修改
下面应该工作。 此重载限制并发用户数量。
The following should work. This overload limits the number of concurrent subscriptions.
var resultObservable = pages
.Select(p => Observable.FromAsync(() => GetPage(p)))
.Merge(maxConcurrent);
说明
为了理解为什么需要这种改变,我们需要一些背景
Explanation
In order to understand why this change is needed we need some background
-
FromAsync
返回一个可观察的,将调用传递函数功能
every一次订阅的的。这意味着,如果观察到从未订阅,它永远不会被调用
FromAsync
returns an observable that will invoke the passedFunc
every time it is subscribed to. This implies that if the observable is never subscribed to, it will never be invoked.
合并
急切地读取源序列,只有的订阅到 N
同时观测。
Merge
eagerly reads the source sequence, and only subscribes to n
observables simultaneously.
有了这两条,我们可以知道为什么原来的版本将并行执行的一切:因为(2), GETPAGE
将已调用的所有源字符串由时间合并
决定多少观测需要订阅。
With these two pieces we can know why the original version will execute everything in parallel: because of (2), GetPage
will have already been invoked for all the source strings by the time Merge
decides how many observables need to be subscribed.
和我们也可以看到,为什么第二个版本的作品:即使序列已完全遍历,(1)意味着 GETPAGE
则不会调用直到合并
决定它需要订阅 N
观测。这导致仅的期望的结果ñ
任务中同时执行。
And we can also see why the second version works: even though the sequence has been fully iterated over, (1) means that GetPage
is not invoked until Merge
decides it needs to subscribe to n
observables. This leads to the desired result of only n
tasks being executed simultaneously.
这篇关于转换一个IEnumerable的&LT; T&GT;到的IObservable&LT;吨&gt;中具有最大并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!