为什么在这个 Suspendable 实现(“Rx Pausable")中没有调用 OnCompleted? [英] Why is not OnCompleted not called in this Suspendable implementation ("Rx Pausable")?

查看:46
本文介绍了为什么在这个 Suspendable 实现(“Rx Pausable")中没有调用 OnCompleted?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我阅读了以下摘自 Ollie Riches 的 博客文章 试图让 Rx 更实用,结果和作者一样疑惑:为什么 OnCompleted 没有通过?有人能告诉这里发生了什么吗?也许有些简单的令人尴尬?

I read the following snipped from Ollie Riches' blog post Trying to be more functional with Rx and became to wonder the same too as the author: why is not OnCompleted passed? Could someone tell what is happening here? Perhaps something embarrassingly simple?

为了方便起见,这里对代码进行了一些修改和复制(如果不能在这里翻录他的代码,我向 Ollie 道歉):

The code is a bit bit modified and reproduced here for convenience (and my apologies to Ollie if it wasn't acceptable to rip his code here):

public static class RxExtensions
{
    public static IObservable<T> Suspendable<T>(this IObservable<T> stream, IObservable<bool> suspend, bool initialState = false)
    {
        return Observable.Create<T>(o =>
        {
            var disposable = suspend.StartWith(initialState)
                    .DistinctUntilChanged()
                    .Select(s => s ? Observable.Empty<T>() : stream)
                    .Switch()
                    .Subscribe(o);

            return disposable;
        });
    }
}

var testScheduler = new TestScheduler();
var generatorCount = 10;

//If the limit will be hardcoded to something less than generatorCount, an exception will be
//thrown and the exception object will be set. Why it doesn't happen to completed in the following?
var generator = Observable.Generate(1,
    x => x <= generatorCount,
    x => x + 1,
    x => { if(x != 11) { Console.WriteLine(x); return x; } else { throw new ArgumentException(); } },
    x => TimeSpan.FromSeconds(1),
    testScheduler);


Exception exception = null;
var completed = false;
generator.Suspendable(new Subject<bool>()).Subscribe(_ => { }, e => exception = e, () => completed = true);   
testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1001000).Ticks);

Console.WriteLine(exception);
Console.WriteLine(completed);

为了记录,我想尝试产生一个可以暂停和停止的流,区别在于暂停的流累积事件,暂停只是跳过它们.它开始看起来比我预期的要复杂一些,特别是如果有人想对暂停的位设置限制或保存策略".哦,好吧...

For the record, I was thinking to try to produce a stream that can be paused and stopped with the distinction that paused stream accumulates events, suspeneded just skips them. It started to look a bit more involved than I expected to, especially if one thinks to put a limit or "save strategy" to the paused bit. Oh well...

<edit: 有趣的是,我刚刚注意到一个 Pausable 的 RxJS 实现.

<edit: Interestingly, I just noticed a RxJS implementation of Pausable.

推荐答案

您的观察者订阅了 suspend 流和 source 流.直到两个流都完成后,这个组合流才会完成.基本上您的 source 流已完成,但 Suspendable 正在等待查看是否有更多暂停/取消暂停信号通过.如果他们这样做,它将重新订阅源流.

Your observer is subscribed to both the suspend stream and the source stream. This combined stream will not complete until both streams complete. Basically your source stream completes, but the Suspendable is waiting to see if any more pause/unpause signals will come through. If they do, it will resubscribe to the source stream.

在源流完成时完成可暂停流是可能的,但可能会破坏您的方法的目的.基本上必须保持订阅源流并在源完成时结束暂停的流.你可以这样做:

To have the pausable stream complete when the source stream completes is possible, but probably would defeat the purpose of your method. Something would basically have to stay subscribed to the source stream and end the paused stream when source completes. You could do it with something like this:

var shared = stream.Publish();
var pausable = suspend
    .StartWith(initialState)
    .TakeUntil(shared.LastOrDefaultAsync())
    .DistinctUntilChanged()
    .Select(p => p ? shared : Observable.Empty<T>())
    .Switch();
var disposable = new CompositeDisposable(pausable.Subscribe(o), shared.Connect());
return disposable;

这篇关于为什么在这个 Suspendable 实现(“Rx Pausable")中没有调用 OnCompleted?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆