是否可以重新启动 IConnectableObservable? [英] Is it possible to restart an IConnectableObservable?

查看:50
本文介绍了是否可以重新启动 IConnectableObservable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想弄清楚如何在 IConnectableObservable 完成或出错后重新启动"它.

I'm trying to figure out how to "restart" an IConnectableObservable after it has completed or errored.

下面的代码显示了一个可连接的 observable 的两个订阅者 (A, B).一旦他们的订阅被处理,一个新的订阅者 (C) 就会连接.我希望它在 C 看来是一个全新的可观察对象,但我只在第一个订阅中引发了异常.

The code below shows two subscribers (A, B) to a connnectable observable. Once their subscription is disposed, a new subscriber (C) connects. I'd like it to appear to C that it is an entirely new observable, but I am only getting the exception raised in the first subscription.

static void Main(string[] args)
{
    var o = Observable.Create<string>(observer =>
        {
            observer.OnNext("msg");
            observer.OnError(new Exception("boom"));
            return Disposable.Create(() => {
                Console.WriteLine("Observer has unsubscribed");
            });
        }
    )
    .Publish();

    o.Subscribe(
        x => Console.WriteLine("A: " + x),
        ex => Console.WriteLine("A: " + ex),
        () => Console.WriteLine("A: done"));

    o.Subscribe(
        x => Console.WriteLine("B: " + x),
        ex => Console.WriteLine("B: " + ex),
        () => Console.WriteLine("B: done"));            

    var subscription = o.Connect();

    subscription.Dispose();

    o.Subscribe(
        x => Console.WriteLine("C: " + x),
        ex => Console.WriteLine("C: " + ex),
        () => Console.WriteLine("C: done"));        

    subscription = o.Connect();

}

给出以下结果:

A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: System.Exception: boom
Observer has unsubscribed

而我想:

A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: msg
C: System.Exception: boom
Observer has unsubscribed

有什么想法吗?谢谢!

推荐答案

虽然它不会重新启动"可观察对象,但将 Publish 替换为 Replay 提供输出你在期待.但是,请记住,这将缓冲来自源 observable 的所有值.最好限制重播值的数量.

While it doesn't "restart" the observable, replacing Publish with Replay provides the output you are expecting. However, keep in mind this will buffer all of the values from the source observable. It's best to put a limit on the number of replayed values.

这篇关于是否可以重新启动 IConnectableObservable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆