如何在 Reactive Extensions 中同时选择 Head 和 Tail [英] How to Select Head and Tail at the same time in Reactive Extensions

查看:48
本文介绍了如何在 Reactive Extensions 中同时选择 Head 和 Tail的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建以下组合器

public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{

}

选择器方法应该传递当前事件和所有未来事件的可观察对象,即尾部.必须保证在未来任何时候订阅尾部时,第一个接收到的事件将是在头部之后接收到的下一个事件.

The selector method should be passed the current event and an observable to all future events, the tail. It must be guaranteed that upon subscribing to the tail any time in the future the first received event will be the very next one that was received after the head.

我知道这需要一些缓冲,但我不太确定如何将它们放在一起.

I'm aware that this will require some buffering but I'm not quite sure how to put this together.

这有一些不错的特性.你可以这样做

This has some nice properties. You can do

IObservable<IObservable<Unit>> windows =
source
    .HeadTailSelect((h,tail)=>Observable
        .Interval(TimeSpan.FromSeconds(1))
        .TakeUntil(tail)
        .Select(_=>Unit.Default)
    )

并避免竞争条件,即在响应第一个事件后您错过一些事件后,在窗口内需要 TakeUntil 进行注册.

and avoid race conditions whereby within the window it takes TakeUntil to be registered after responding to the first event you miss some events.

还有关于如何测试实施的任何想法的奖励业力.

Also bonus karma for any ideas on how to test the implementation.

以下测试用例对于实现满足是必要的,尽管它可能不足以证明是否避免了竞争条件.

The following test case is necessary for the implementation to satisfy though it may not be sufficient to prove if race conditions are avoided.

public class HeadTailSelect : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();

    [Fact]
    public void ShouldWork()
    {

        var o = _Scheduler.CreateColdObservable
            (OnNext(10, "A")
            , OnNext(11, "B")
            , OnNext(12, "C")
            , OnNext(13, "D")
            , OnNext(14, "E")
            , OnNext(15, "F")
            , OnCompleted<string>(700)
            );

        var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
            .SelectMany(p=>p)
            .Select(l=>String.Join("-", l));


        var actual = _Scheduler.Start(() =>
            data
        , created: 0
        , subscribed: 1
        , disposed: 1000
        );

        actual.Messages.Count()
                .Should()
                .Be(7);

        var messages = actual.Messages.Take(6)
                             .Select(v => v.Value.Value)
                             .ToList();

        messages[0].Should().Be("B-C");
        messages[1].Should().Be("C-D");
        messages[2].Should().Be("D-E");
        messages[3].Should().Be("E-F");
        messages[4].Should().Be("F");
        messages[5].Should().Be("");

    }
}

推荐答案

这是通过上述测试的候选解决方案.但是,我不确定它是否满足要求.

Here is a candidate solution which passes the above test. However I'm not rock certain that it satisfies requirements.

/// <summary>
/// Pass the head and tail of the observable to the
/// selector function. Note that 
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="U"></typeparam>
/// <param name="source"></param>
/// <param name="fn"></param>
/// <returns></returns>
public static IObservable<U> HeadTailSelect<T, U>
    (this IObservable<T> source, Func<T, IObservable<T>, U> fn)
{
    var tail = new Subject<T>();
    return Observable.Create<U>(observer =>
    {
        return source.Subscribe(v =>
        {
            tail.OnNext(v);
            var u = fn(v, tail);
            observer.OnNext(u);

        }
        ,e=> { tail.OnCompleted();observer.OnError(e);  }
        ,()=> { tail.OnCompleted();observer.OnCompleted();  });
    });
}

请注意,u 很可能是某种IObservable,应该立即订阅.如果这样做了,我认为一切都应该没问题.

Note that u is most likely to be some kind of IObservable and should be subscribed to immediately. If this is done I think everything should be ok.

这篇关于如何在 Reactive Extensions 中同时选择 Head 和 Tail的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆