转换一个IEnumerable的< T>到的IObservable<吨>中具有最大并行 [英] Converting a IEnumerable<T> to IObservable<T>, with maximum parallelism

查看:124
本文介绍了转换一个IEnumerable的< T>到的IObservable<吨>中具有最大并行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有异步任务的指令序列(例如,取N个Web页)。现在我要的是他们都公开为的IObservable&LT; T&GT; 。我目前的解决方案采用从答案<一个href=\"http://stackoverflow.com/questions/13500456/how-to-convert-an-ienumerabletaskt-to-iobservablet\">this问题:

I have a sequence of async tasks to do (say, fetch N web pages). Now what I want is to expose them all as an IObservable<T>. My current solution uses the answer from this question:

async Task<ResultObj> GetPage(string page) {
    Console.WriteLine("Before");
    var result = await FetchFromInternet(page);
    Console.WriteLine("After");
    return result;
}

// pages is an IEnumerable<string>
IObservable<ResultObj> resultObservable =pages.Select(GetPage).
                 Select(t => Observable.FromAsync(() => t)).Merge();

// Now consume the list
foreach(ResultObj obj in resultObservable.ToEnumerable()) {
    Console.WriteLine(obj.ToString());
}

的问题是不知道的页数要获取,并且它可能是大的。我并不想使数百并发请求。因此,我希望的方式来限制将并行执行的任务的最大数量。有没有办法来限制 GETPAGE

有一个合并重载需要一个maxConcurrent参数,但它似乎并没有真正限制功能invokation的并发性。控制台打印所有之前的消息前,后的消息。

There is a Merge overload that takes a maxConcurrent parameter, but it does not seem to actually limit the concurrency of the function invokation. THe console prints all the Before messages before the After messages.

请注意:我需要回到的IEnumerable&LT转换; T&GT; 。我正在写,让我的数据获取的描述系统的数据源,我需要给它回下载的数据的列表。

Note: I need to convert back to IEnumerable<T>. I'm writing a data source for a system that gives me descriptors of data to fetch, and I need to give it back a list of the downloaded data.

推荐答案

修改

下面应该工作。 此重载限制并发用户数量。

The following should work. This overload limits the number of concurrent subscriptions.

var resultObservable = pages
  .Select(p => Observable.FromAsync(() => GetPage(p)))
  .Merge(maxConcurrent);

说明

为了理解为什么需要这种改变,我们需要一些背景

Explanation

In order to understand why this change is needed we need some background


  1. FromAsync 返回一个可观察的,将调用传递函数功能 every一次订阅的的。这意味着,如果观察到从未订阅,它永远不会被调用

  1. FromAsync returns an observable that will invoke the passed Func every time it is subscribed to. This implies that if the observable is never subscribed to, it will never be invoked.

合并急切地读取源序列,只有的订阅 N 同时观测。

Merge eagerly reads the source sequence, and only subscribes to n observables simultaneously.

有了这两条,我们可以知道为什么原来的版本将并行执行的一切:因为(2), GETPAGE 将已调用的所有源字符串由时间合并决定多少观测需要订阅。

With these two pieces we can know why the original version will execute everything in parallel: because of (2), GetPage will have already been invoked for all the source strings by the time Merge decides how many observables need to be subscribed.

和我们也可以看到,为什么第二个版本的作品:即使序列已完全遍历,(1)意味着 GETPAGE 则不会调用直到合并决定它需要订阅 N 观测。这导致仅的期望的结果ñ任务中同时执行。

And we can also see why the second version works: even though the sequence has been fully iterated over, (1) means that GetPage is not invoked until Merge decides it needs to subscribe to n observables. This leads to the desired result of only n tasks being executed simultaneously.

这篇关于转换一个IEnumerable的&LT; T&GT;到的IObservable&LT;吨&gt;中具有最大并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆