当最后一个观察者取消订阅时如何防止 AsyncSubject 完成 [英] How to prevent AsyncSubject from completing when the last observer unsubscribes

查看:34
本文介绍了当最后一个观察者取消订阅时如何防止 AsyncSubject 完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当最后一个主题观察者取消订阅主题时,AsyncSubject 变为可观察的.这是引用:

The AsyncSubject becomes observable when the last subject observer unsubscribes from the subject. Here is the quote:

当它完成时,它就完成了.主题在它们之后不能重复使用取消订阅、完成或出错.

When it’s done, it’s done. Subjects cannot be reused after they’re unsubscribed, completed or errored.

这是演示:

const ofObservable = Rx.Observable.of(1, 2, 3);
const subject = new Rx.AsyncSubject();

ofObservable.subscribe(subject);

subject.subscribe((v) => {
    console.log(v);
});

subject.unsubscribe((v) => {
    console.log(v);
});

// here I'll get the error "object unsubscribed"
subject.subscribe((v) => {
    console.log(v);
});

如何防止主题完成?

有一个 share 操作符:

在 RxJS 5 中,操作符 share() 生成了一个热的、refCounted 可观察对象可以在失败时重试,或在成功时重复.因为主题一旦出错、完成或以其他方式无法重复使用取消订阅,share() 运算符将回收死主题启用重新订阅结果可观察对象.

In RxJS 5, the operator share() makes a hot, refCounted observable that can be retried on failure, or repeated on success. Because subjects cannot be reused once they’ve errored, completed or otherwise unsubscribed, the share() operator will recycle dead subjects to enable resubscription to the resulting observable.

这就是我要找的.但是 share 创建了一个主题,我需要 AsyncSubject.

That is what I'm looking for. But share creates a subject and I need AsyncSubject.

推荐答案

问题归结为这一行:

subject.unsubscribe((v) => {
    console.log(v);
});

Subject 实现 <代码>ISubscription;这意味着它有一个 unsubscribe 方法和一个 closed 属性.它的实现unsubscribe如下:

Subject implements ISubscription; which means it has an unsubscribe method and a closed property. Its implementation of unsubscribe is as follows:

unsubscribe() {
  this.isStopped = true;
  this.closed = true;
  this.observers = null;
}

这有点残酷.从本质上讲,它会切断与该主题的任何订阅者的所有通信,而无需取消订阅.类似地,它不会从它可能订阅的任何 observable 中取消订阅主题本身.(它还将主题标记为已关闭/已停止,这就是您出错的原因.)

Which is somewhat brutal. Essentially, it severs all communication with any subscribers to the subject without unsubscribing them. Similarly, it does not unsubscribe the subject itself from any observable it might happen to be subscribed to. (It also marks the subject as closed/stopped, which was the reason for your error.)

鉴于它实际上并没有执行任何取消订阅,它应该如何使用尚不清楚.本次测试的说明:

Given that it doesn't acutally perform any unsubscriptions, how it's supposed to be used is unclear. The description of this test:

it('should disallow new subscriber once subject has been disposed', () => {

暗示这可能是 RxJS 4 的某种后遗症 - 在其中取消订阅被称为处置.

suggests that it might be some sort of hangover from RxJS 4 - in which unsubscription was termed disposal.

无论其存在的原因是什么,我都建议永远不要调用它.举个例子,看看这个片段:

Whatever the reason for its being, I would recommend never calling it. By way of example, look at this snippet:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));

.as-console-wrapper { max-height: 100% !important; top: 0; }

<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

它将一个主题订阅到一个源 observable,然后订阅一个由该主题组成的 observable.

It subscribes a subject to a source observable and then subscribes to an observable composed from the subject.

如果在主题上调用 unsubscribe,就会出现几个明显的问题:

If unsubscribe is called on the subject, a couple of problems become evident:

  • 主体对源的订阅并未取消订阅,并且当源尝试调用主体的 next 方法时会发生错误;和
  • 对由主题组成的 observable 的订阅并未取消订阅,因此 switchMap 中的 interval observable 在 unsubscribe 调用后继续发射制成.
  • the subject's subscription to the source is not unsubscribed and an error is effected when the source attempts to call the subject's next method; and
  • the subscription to the observable composed from the subject is not unsubscribed, so the interval observable within the switchMap keeps emitting after the unsubscribe call is made.

试试看:

const source = Rx.Observable
  .interval(200)
  .take(5)
  .do(value => console.log(`source: ${value}`));

const subject = new Rx.Subject();
source.subscribe(subject);

const subscription = subject
  .switchMap(() => Rx.Observable
    .interval(200)
    .take(5)
    .delay(500))
  .subscribe(value => console.log(`subscription: ${value}`));

setTimeout(() => {
  console.log("subject.unsubscribe()");
  subject.unsubscribe();
}, 700);

.as-console-wrapper { max-height: 100% !important; top: 0; }

<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

这些似乎都不是可取的行为,因此应避免在 Subject 上调用 unsubscribe.

None of this seems like desirable behaviour, so calling unsubscribe on a Subject is something to be avoided.

相反,您的代码段中的代码应该使用 subscribe 调用返回的 Subscription 取消订阅:

Instead, the code in your snippet should unsubscribe using the Subscription returned by the subscribe call:

const subscription = subject.subscribe((v) => {
  console.log(v);
});
subscription.unsubscribe();

<小时>

在写下这个答案之后,我找到了 以下评论来自Ben Lesh,这符合我的理论,即它与主题的处置有关:


Subsequent to writing this answer, I've found the following comment from Ben Lesh, that fits with my theory that it's related to the disposal of the subject:

如果您希望主题在有用后next 时大声而愤怒地出错,您可以直接在主题实例本身上调用 unsubscribe.

If you want the subject to loudly and angrily error when you next to it after it’s done being useful, you can call unsubscribe directly on the subject instance itself.

这篇关于当最后一个观察者取消订阅时如何防止 AsyncSubject 完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆