使反应扩展缓冲等待异步操作完成 [英] Make Reactive Extensions Buffer wait for asynchronous operation to complete

查看:197
本文介绍了使反应扩展缓冲等待异步操作完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用无扩展(RX)缓冲一些数据。我有一个问题,虽然,我然后需要做的事情异步这个数据,但我不希望通过缓冲区传递下一组,直到异步操作完成。

I am using Reactive Extensions (Rx) to buffer some data. I'm having an issue though in that I then need to do something asynchronous with this data, yet I don't want the buffer to pass the next group through until the asynchronous operation is complete.

我试图构建代码两种方式(人为的例子):

I've tried to structure the code two ways (contrived example):

public async Task processFiles<File>(IEnumerable<File> files)
{
    await files.ToObservable()
        .Buffer(10)
        .SelectMany(fi => fi.Select(f => upload(f)) //Now have an IObservable<Task>
        .Select(t => t.ToObservable())
        .Merge()
        .LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}

public async Task processFiles<File>(IEnumerable<File> files)
{
    var buffered = files.ToObservable()
        .Buffer(10);

    buffered.Subscribe(async files => await Task.WhenAll(files.Select(f => upload(f)));

    await buffered.LastAsync();
}

public Task upload(File item)
{
    return Task.Run(() => { //Stuff });
}



不幸的是,这些方法已为缓冲工作推下一组之前异步操作完成。这样做的目的是让每个缓冲组异步执行,只有当操作完成后,继续下一个缓冲组。

Unfortunately, neither of these methods have worked as the buffer pushes the next group before the async operations complete. The intent is to have each buffered group executed asynchronously and only when that operation is complete, continue with the next buffered group.

任何帮助是极大的赞赏。

Any help is greatly appreciated.

推荐答案

首先,我认为您的要求来执行各组的物品平行,但各组的系列是相当不寻常的。更常见的要求是,以并行执行的项目,但同时他们最多n。通过这种方式,有不固定的群体,因此,如果单件商品花费的时间太长,其他项目不必等待吧。

First, I think your requirement to execute the items from each group in parallel, but each group in series is quite unusual. A more common requirement would be to to execute the items in parallel, but at most n of them at the same time. This way, there are not fixed groups, so if a single items takes too long, other items don't have to wait for it.

要你在做什么要求,我觉得TPL数据流比接收更合适(尽管有些接收代码仍然将是有益的)。 。TPL数据流为中心,在顺序执行的东西,在默认情况下,这是你所需要的东西的块

To do what you're asking for, I think TPL Dataflow is more suitable than Rx (though some Rx code will still be useful). TPL Dataflow is centered about "blocks" that execute stuff, by default in series, which is exactly what you need.

您的代码看起来是这样的:

Your code could look like this:

public static class Extensions
{
    public static Task ExecuteInGroupsAsync<T>(
         this IEnumerable<T> source, Func<T, Task> func, int groupSize)
     {
         var block = new ActionBlock<IEnumerable<T>>(
             g => Task.WhenAll(g.Select(func)));
         source.ToObservable()
               .Buffer(groupSize)
               .Subscribe(block.AsObserver());
         return block.Completion;
     }
}

public Task ProcessFiles(IEnumerable<File> files)
{
    return files.ExecuteInGroupsAsync(Upload, 10);
}

这让大部分上繁重的 ActionBlock (以及一些对RX)。数据流块可以充当接收观察者(和观测),因此我们可以利用这一点通过保持缓冲器()

This leaves most of the heavy lifting on the ActionBlock (and some on Rx). Dataflow blocks can act as Rx observers (and observables), so we can take advantage of that to keep using Buffer().

我们希望一次处理整个集团,所以我们使用 Task.WhenAll()来创建一个工作全团完成时完成。数据流块了解工作 -returning功能,让下一组不会开始执行,直到工作由前一组返回完成。

We want to handle the whole group at once, so we use Task.WhenAll() to create a Task that completes when the whole group completes. Dataflow blocks understand Task-returning functions, so next group won't start executing until the Task returned by the previous group completes.

最终的结果是完成 任务 ,这将在源可观察完成之后完成全部处理完成。

The final result is the Completion Task, which will complete after the source observable completes and all processing is done.

TPL数据流也有 BatchBlock ,其中就像缓冲器()键,我们可以直接发布()从集合(每个项目不使用 ToObservable() AsObserver()),但我认为使用接收的这部分代码使得它更简单。

TPL Dataflow also has BatchBlock, which works like Buffer() and we could directly Post() each item from the collection (without using ToObservable() and AsObserver()), but I think using Rx for this part of the code makes it simpler.

编辑:其实你不需要TPL数据流在这里的。使用 ToEnumerable()詹姆斯世界建议将是不够的:

Actually you don't need TPL Dataflow here at all. Using ToEnumerable() as James World suggested will be enough:

public static async Task ExecuteInGroupsAsync<T>(
     this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.ToObservable().Buffer(groupSize).ToEnumerable();
    foreach (var g in groups)
    {
        await Task.WhenAll(g.Select(func));
    }
}



或者也可以简单而不接收使用批量()通过 morelinq

public static async Task ExecuteInGroupsAsync<T>(
    this IEnumerable<T> source, Func<T, Task> func, int groupSize)
{
    var groups = source.Batch(groupSize);
    foreach (var group in groups)
    {
        await Task.WhenAll(group.Select(func));
    }
}

这篇关于使反应扩展缓冲等待异步操作完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆