Rx.NET“门"操作员 [英] Rx.NET "gate" operator

查看:22
本文介绍了Rx.NET“门"操作员的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

[注意:如果重要的话,我使用的是 3.1.另外,我在 codereview 上问过这个问题,但到目前为止还没有回复.]

[Note: I am using 3.1 if that matters. Also, I've asked this on codereview but no responses so far.]

我需要一个运算符来允许布尔值流充当另一个流的门(当门流为真时让值通过,当它为假时丢弃它们).我通常会为此使用 Switch,但如果源流很冷,它会继续重新创建它,这是我不想要的.

I need an operator to allow a stream of booleans to act as a gate for another stream (let values pass when the gate stream is true, drop them when it's false). I would normally use Switch for this, but if the source stream is cold it will keep recreating it, which I don't want.

我也想在自己之后进行清理,以便在源或门完成时结果完成.

I also want to clean up after myself, so that the result completes if either of the source or the gate complete.

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    var s = source.Publish().RefCount();
    var g = gate.Publish().RefCount();

    var sourceCompleted = s.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);
    var gateCompleted = g.TakeLast(1).DefaultIfEmpty().Select(_ => Unit.Default);

    var anyCompleted = Observable.Amb(sourceCompleted, gateCompleted);

    var flag = false;
    g.TakeUntil(anyCompleted).Subscribe(value => flag = value);

    return s.Where(_ => flag).TakeUntil(anyCompleted);
}

除了整体冗长之外,我不喜欢我订阅门,即使结果从未订阅过(在这种情况下,此运算符应该是空操作).有没有办法摆脱订阅?

Besides the overall verbosity, I dislike that I subscribe to the gate even if the result is never subscribed to (in which case this operator should be a no-op). Is there a way to get rid of that subscribe?

我也尝试过这种实现,但在自行清理时更糟:

I have also tried this implementation, but it's even worse when it comes to cleaning up after itself:

return Observable.Create<T>(
    o =>
    {
        var flag = false;
        gate.Subscribe(value => flag = value);

        return source.Subscribe(
            value =>
            {
                if (flag) o.OnNext(value);
            });
    });

这些是我用来检查实现的测试:

These are the tests I'm using to check the implementation:

[TestMethod]
public void TestMethod1()
{
    var output = new List<int>();

    var source = new Subject<int>();
    var gate = new Subject<bool>();

    var result = source.When(gate);
    result.Subscribe(output.Add, () => output.Add(-1));

    // the gate starts with false, so the source events are ignored
    source.OnNext(1);
    source.OnNext(2);
    source.OnNext(3);
    CollectionAssert.AreEqual(new int[0], output);

    // setting the gate to true will let the source events pass
    gate.OnNext(true);
    source.OnNext(4);
    CollectionAssert.AreEqual(new[] { 4 }, output);
    source.OnNext(5);
    CollectionAssert.AreEqual(new[] { 4, 5 }, output);

    // setting the gate to false stops source events from propagating again
    gate.OnNext(false);
    source.OnNext(6);
    source.OnNext(7);
    CollectionAssert.AreEqual(new[] { 4, 5 }, output);

    // completing the source also completes the result
    source.OnCompleted();
    CollectionAssert.AreEqual(new[] { 4, 5, -1 }, output);
}

[TestMethod]
public void TestMethod2()
{
    // completing the gate also completes the result
    var output = new List<int>();

    var source = new Subject<int>();
    var gate = new Subject<bool>();

    var result = source.When(gate);
    result.Subscribe(output.Add, () => output.Add(-1));

    gate.OnCompleted();
    CollectionAssert.AreEqual(new[] { -1 }, output);
}

推荐答案

这有效:

public static IObservable<T> When<T>(this IObservable<T> source, IObservable<bool> gate)
{
    return
        source.Publish(ss => gate.Publish(gs =>
            gs
                .Select(g => g ? ss : ss.IgnoreElements())
                .Switch()
                .TakeUntil(Observable.Amb(
                    ss.Select(s => true).Materialize().LastAsync(),
                    gs.Materialize().LastAsync()))));
}

这通过了两个测试.

这篇关于Rx.NET“门"操作员的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆