RX Observable.TakeWhile在每个元素之前检查条件,但是我需要在之后进行检查 [英] RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after

查看:55
本文介绍了RX Observable.TakeWhile在每个元素之前检查条件,但是我需要在之后进行检查的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Observable.TakeWhile允许您在条件为真的情况下运行序列(使用委托,以便我们可以对实际的序列对象执行计算),但它会在每个元素之前检查此条件.如何在每个元素之后执行相同的检查?

Observable.TakeWhile allows you to run a sequence as long as a condition is true (using a delegate so we can perform computations on the actual sequence objects), but it's checking this condition BEFORE each element. How can I perform the same check but AFTER each element?

以下代码演示了问题

    void RunIt()
    {
        List<SomeCommand> listOfCommands = new List<SomeCommand>();
        listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
        listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });

        var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);

        obs.Subscribe(x =>
        {
            Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
        });
    }

    class SomeCommand
    {
        public int CurrentIndex;
        public int TotalCount;
    }

此输出

1 of 3
2 of 3

我无法获得第三个元素

看这个例子,您可能会认为我要做的就是这样改变我的状况-

Looking at this example, you may think all I have to do is change my condition like so -

var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);

但是可观察对象将永远不会完成(因为在我的真实世界代码中,流不会在这三个命令之后结束)

But then the observable will never complete (because in my real world code, the stream doesn't end after those three commands)

推荐答案

最终

我的解决方案基于该线程中Sergey的TakeWhileInclusive实现-如何完成Rx Observable取决于事件中的情况

I based my solution off of Sergey's TakeWhileInclusive implementation in this thread - How to complete a Rx Observable depending on a condition in a event

public static IObservable<TSource> TakeUntil<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
        {
            o.OnNext(x);
            if (predicate(x))
                o.OnCompleted();
        },
        o.OnError,
        o.OnCompleted
    ));
}

这篇关于RX Observable.TakeWhile在每个元素之前检查条件,但是我需要在之后进行检查的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆