使 IObservable<T>使用 async/await 以原始顺序返回已完成的任务 [英] Making an IObservable&lt;T&gt; that uses async/await return completed tasks in original order

查看:39
本文介绍了使 IObservable<T>使用 async/await 以原始顺序返回已完成的任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设您有一个包含 100 个 url 的列表,并且您想下载它们,解析响应并通过 IObservable 推送结果:

Suppose you have a list of 100 urls and you want to download them, parse the response and push the results through an IObservable:

public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    return urls
        .ToObservable()
        .Select(async url =>
        {
            var bytes = await this.DownloadImage(url);
            var image = await this.ParseImage(bytes);
            return image;
        });
}

我对此有一些问题.

一个是同时攻击具有 100 个请求的服务器是不礼貌的 - 理想情况下,您将在给定时刻将速率限制为 6 个请求.但是,如果我添加 Buffer 调用,由于 Select 中的异步 lambda,所有内容仍会同时触发.

One is that it's bad etiquette to hammer a server with 100 requests at the same time -- ideally you would rate limit to maybe 6 requests at a given moment. However, if I add a Buffer call, due to the async lambda in Select, everything still fires at the same time.

此外,返回结果的顺序与 URL 的输入顺序不同,这很糟糕,因为图像是将在 UI 上显示的动画的一部分.

Moreover, the results will come back in a different order than the input sequence of URLs, which is bad, because the images are part of an animation that will be displayed on the UI.

我尝试了各种方法,我有一个有效的解决方案,但感觉很复杂:

I've tried all kinds of things, and I have a solution that's working, but it feels convoluted:

public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    var semaphore = new SemaphoreSlim(6);

    return Observable.Create<ImageSource>(async observable =>
    {
        var tasks = urls
            .Select(async url =>
            {
                await semaphore.WaitAsync();
                var bytes = await this.DownloadImage(url);
                var image = await this.ParseImage(url);
            })
            .ToList();

        foreach (var task in tasks)
        {
            observable.OnNext(await task);
        }

        observable.OnCompleted();
    });
}

它有效,但现在我在做 Observable.Create 而不是 IObservable.Select,而且我必须弄乱信号量.此外,在 UI 上运行的其他动画在运行时会停止(它们基本上只是 DispatcherTimer 实例),所以我想我一定是做错了什么.

It works, but now I'm doing Observable.Create instead of just IObservable.Select, and I have to mess with the semaphore. Also, other animations that run on the UI stop when this is running (they're basically just DispatcherTimer instances), so I think I must be doing something wrong.

推荐答案

试试这个:

urls.ToObservable()
    .Select(url => Observable.FromAsync(async () => {
        var bytes = await this.DownloadImage(url);
        var image = await this.ParseImage(bytes);
        return image;        
    }))
    .Merge(6 /*at a time*/);

我们在这里做什么?

对于每个 URL,我们正在创建一个 Cold Observable(即,在有人调用 Subscribe 之前根本不会做任何事情的).FromAsync 返回一个 Observable,当你订阅它时,它运行你给它的异步块.因此,我们将 URL 选择到一个可以为我们完成工作的对象中,但前提是我们稍后询问.

What are we doing here?

For each URL, we're creating a Cold Observable (i.e. one that won't do anything at all, until somebody calls Subscribe). FromAsync returns an Observable that, when you Subscribe to it, runs the async block you gave it. So, we're Selecting the URL into an object that will do the work for us, but only if we ask it later.

然后,我们的结果是一个 IObservable>——一个 Future 结果流.我们想将该流平展为一个结果流,因此我们使用 Merge(int).合并运算符将一次订阅 n 个项目,当它们回来时,我们将订阅更多.即使 url 列表很大,Merge 缓冲的项目也只是一个 URL 和一个 Func 对象(即 what 做什么的描述),所以相对较小.

Then, our result is an IObservable<IObservable<Image>> - a stream of Future results. We want to flatten that stream, into just a stream of results, so we use Merge(int). The merge operator will subscribe to n items at a time, and as they come back, we'll subscribe to more. Even if url list is very large, the items that Merge are buffering are only a URL and a Func object (i.e. the description of what to do), so relatively small.

这篇关于使 IObservable<T>使用 async/await 以原始顺序返回已完成的任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆