使用Subject解耦可观察的订阅和初始化 [英] Using Subject to decouple Observable subscription and initialisation

查看:125
本文介绍了使用Subject解耦可观察的订阅和初始化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个公开IObservable状态的API.但是,此状态取决于必须通过Init初始化的潜在可观察源.

I have an API which exposes an IObservable Status. But this status depends on an underlying observable source which has to be initialised via Init.

我想做的是保护用户不必以正确的顺序执行操作:按照目前的现状,如果用户在执行Init之前尝试订阅Status,他们将获得一个例外.因为它们的来源尚未初始化.

What I'd like to do is protect the users from having to do things in the right order: as it currently stands, if they try to subscribe to the Status before performing an Init, they get an exception because they source is not initialised.

所以我有个天才的想法,就是使用Subject来使两者脱钩:订阅我的Status的外部用户只是订阅了Subject,然后当他们调用Init时,我订阅了基础服务使用我的主题.

So I had the genius idea of using a Subject to decouple the two: the external user subscribing to my Status is just subscribing to the Subject, then when they call Init, I subscribe to the underlying service using my Subject.

private ISubject<bool> _StatusSubject = new Subject<bool>();
public IObservable<bool> Status { get { return _StatusSubject; } }

public void Init() 
{
    _Connection = new Connection();
    Underlying.GetDeferredObservable(_Connection).Subscribe(_StatusSubject);
}

但是,从虚拟项目的测试来看,问题在于初始化 通过订阅主题来唤醒"我的基础可观察对象,即使没有人订阅该主题.我想尽可能避免这种情况,但是我不确定如何...

However, from tests on a dummy project, the problem is that the initialisation 'wakes up' my underlying Observable by subscribing the Subject to it, even if nobody has yet subscribed to the subject. That's something I'd like to avoid if possible, but I'm not sure how...

(我也谨记获得的智慧一般规则是,如果您使用主题,那么您做错了事"")

(I'm also mindful of the received wisdom that "the general rule is that if you're using a subject then you're doing something wrong")

推荐答案

似乎您缺少的概念是如何知道何时有人开始收听并且仅初始化您的基础源.通常,您使用Observable.Create或其同级之一(DeferUsing,...)来执行此操作.

It seems like the concept you are missing is how to know when someone starts listening and only init your underlying source. Usually you use Observable.Create or one of its siblings (Defer, Using, ...) to do this.

在没有Subject的情况下如何做:

Here's how to do it without a Subject:

private IObservable<bool> _status = Observable.Defer(() =>
{
    _Connection = new Connection();
    return Underlying.GetDeferredObservable(_Connection);
};

public IObservable<bool> Status { get { return _status; } }

Defer除非有人真正订阅,否则不会调用初始化代码.

Defer will not call the init code until someone actually subscribes.

但这有两个潜在的问题:

But this has a couple of potential issues:

  1. 每个观察者将建立一个新的连接
  2. 观察者退订时,连接不会被清除.

第二个问题很容易解决,所以让我们首先开始.假设您的Connection是一次性的,在这种情况下,您可以这样做:

The 2nd issue is easy to solve, so let's do that first. Let's assume your Connection is disposable, in which case you can just do:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection));

public IObservable<bool> Status { get { return _status; } }

通过此迭代,每当有人订阅时,都会创建一个新的Connection并将其传递给第二个lamba方法以构造可观察对象.每当观察者退订时,Connection就是Disposed.如果Connection不是IDisposable,则可以使用Disposable.Create(Action)创建一个IDisposable,它将执行清除连接所需的任何操作.

With this iteration, whenever someone subscribes, a new Connection is created and passed to the 2nd lamba method to construct the observable. Whenever the observer unsubscribes, the Connection is Disposed. If Connection is not a IDisposable, then you can use Disposable.Create(Action) to create an IDisposable which will run whatever action you need to run to cleanup the connection.

您仍然存在每个观察者都创建一个新连接的问题.我们可以使用PublishRefCount解决该问题:

You still have the problem that each observer creates a new connection. We can use Publish and RefCount to solve that problem:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection))
    .Publish()
    .RefCount();

public IObservable<bool> Status { get { return _status; } }

现在,当 first 观察者订阅时,将创建连接并订阅基础的observable.随后的观察者将共享连接并获取当前状态.当 last 观察者退订时,连接将被释放,所有连接均将关闭.如果此后有其他观察者订阅,则它们将再次重新备份.

Now, when the first observer subscribes, the connection will get created and the underlying observable will be subscribed. Subsequent observers will share the connection and will pick up the current status. When the last observer unsubscribes, the connection will be disposed and everything shut down. If another observer subscribes after that, it all starts back up again.

在引擎盖下,Publish实际上是在使用Subject共享单个可观察的信号源. RefCount正在跟踪当前正在观察的观察者数量.

Underneath the hood, Publish is actually using a Subject to share the single observable source. And RefCount is tracking how many observers are currently observing.

这篇关于使用Subject解耦可观察的订阅和初始化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆