如何在Rx.Net中实现exhaustMap处理程序? [英] How can I implement an exhaustMap handler in Rx.Net?

查看:54
本文介绍了如何在Rx.Net中实现exhaustMap处理程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从 rxjs 寻找类似于 exhaustMap 运算符的东西,但是 RX.NET 似乎没有这样的运算符.

I am looking for something similar to the exhaustMap operator from rxjs, but RX.NET does not seem to have such an operator.

我需要实现的是,在源流的每个元素上,我需要启动一个 async 处理程序,直到完成,我想从源代码中删除任何元素.处理程序完成后,立即恢复使用元素.

What I need to achieve is that, upon every element of the source stream, I need to start an async handler, and until it finishes, I would like to drop any elements from the source. As soon as the handler finishes, resume taking elements.

我不希望在每个元素上启动异步处理程序-在处理程序运行时,我要删除源元素.

What I don't want is to start an async handler upon every element - while the handler runs, I want to drop source elements.

我还怀疑我需要在这里巧妙地使用defer运算符吗?

I also suspect I need to cleverly use the defer operator here?

谢谢!

推荐答案

这是

Here is an implementation of the ExhaustMap operator. The source observable is projected to an IObservable<Task<TResult>>, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged operator, and finally the observable is flattened with the Concat operator.

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TResult>> function)
{
    return source
        .Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
        {
            return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
        })
        .DistinctUntilChanged()
        .Concat();

    async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}

函数返回的任务不能保证是不同的,因此需要 HideIdentity 本地函数来返回任务的不同包装.

The tasks returned by the function are not guaranteed to be distinct, hence the need for the HideIdentity local function that returns distinct wrappers of the tasks.

用法示例:

Observable
    .Interval(TimeSpan.FromMilliseconds(200))
    .Select(x => (int)x + 1)
    .Take(10)
    .Do(x => Console.WriteLine($"Input: {x}"))
    .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
    .Do(x => Console.WriteLine($"Result: {x}"))
    .Wait();

输出:

Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9


更新:这是一个替代实现,其中 function 生成 IObservable< TResult> 而不是 Task< TResult> :


Update: Here is an alternative implementation, where the function produces an IObservable<TResult> instead of a Task<TResult>:

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> function)
{
    return Observable.Using(() => new SemaphoreSlim(1, 1),
        semaphore => source.SelectMany(item => ProjectItem(item, semaphore)));

    IObservable<TResult> ProjectItem(TSource item, SemaphoreSlim semaphore)
    {
        // Attempt to acquire the semaphore immediately. If successful, return
        // a sequence that releases the semaphore when terminated. Otherwise,
        // return immediately an empty sequence.
        return Observable.If(() => semaphore.Wait(0),
            Observable
                .Defer(() => function(item))
                .Finally(() => semaphore.Release())
        );
    }
}

这很有趣,因为如果需要,它可以很容易地修改并行度.只需使用不同于1的 initialCount 参数实例化 SemaphoreSlim .

This one is interesting because it makes it very easy to modify the degree of parallelism, if needed. Just instantiate the SemaphoreSlim with an initialCount argument different than 1.

这篇关于如何在Rx.Net中实现exhaustMap处理程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆