检查 publishReplay().refCount() 是否有观察者 [英] Check if publishReplay().refCount() has observers or not

查看:40
本文介绍了检查 publishReplay().refCount() 是否有观察者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我像这样定义了一个 Observable:

I define an Observable like this:

const obs$ = Observable.create(...)
  .publishReplay(1)
  .refCount();

这样它就会在我的源 Observable 和所有观察者之间放置一个 ReplaySubject(1).

So that it puts a ReplaySubject(1) between my source Observable and all observers.

由于 ReplaySubject 在其状态中具有观察者的数量(通过其 observers 数组属性),如何从 obs$ 访问 ReplaySubject?

Since ReplaySubject has in its state the number of observers (via its observers array property), how is it possible to access the ReplaySubject from obs$?

我实际上只需要知道 obs$ 是否有任何观察者.RxJS4 在 Subject 上有一个 hasObservers() 方法,但它在 RxJS5 中被删除了.我如何使用 RxJS5 实现这一点?

I actually only need to know if obs$ has any observers or not. RxJS4 had a hasObservers() method on Subject, but it got removed in RxJS5. How can I achieve this with RxJS5?

推荐答案

不确定您的用法,但为了我的需要,我创建了一个自定义运算符,允许我根据状态透明地执行副作用(类似于点击)引用计数.它只是进行直通订阅,并对订阅/取消订阅进行鸭打.回调获取当前的 refCount 和前一个,以便您可以判断状态和方向.我喜欢为此使用运算符,因为我可以在流中的任何点插入它.如果您只是想要一个关于是否有任何订阅的二进制输出,可以很容易地修改.

Not sure about your usage but for my needs I created a custom operator that allowed me to transparently perform side-effects (similar to tap) based on the state of the refCount. It just does a pass-through subscription and duck-punches the sub/unsub. The callback gets the current refCount and the previous so that you can tell the state and direction. I like using an operator for this since I can insert it at any point in my stream. If you simply want a binary output for whether there are any subscriptions or not it could be easily modified for that.

const { Observable, Observer, interval } = rxjs;
const { publishReplay, refCount } = rxjs.operators;

const tapRefCount = (onChange) => (source) => {
  let refCount = 0;

  // mute the operator if it has nothing to do
  if (typeof onChange !== 'function') {
    return source;
  }
  // mute errors from side-effects
  const safeOnChange = (refCount, prevRefCount) => {
    try {
      onChange(refCount, prevRefCount);
    } catch (e) {
    }
  };
  
  // spy on subscribe
  return Observable.create((observer) => {
    const subscription = source.subscribe(observer);
    const prevRefCount = refCount;
    refCount++;
    safeOnChange(refCount, prevRefCount);
    
    // spy on unsubscribe
    return () => {
      subscription.unsubscribe();
      const prevRefCount = refCount;
      refCount--;
      safeOnChange(refCount, prevRefCount);
    };
  });
};

const source = interval(1000).pipe(
  publishReplay(1),
  refCount(),
  tapRefCount((refCount, prevRefCount) => { console.log('refCount', refCount, prevRefCount > refCount ? 'down': 'up'); })
);

const firstSub = source.subscribe((x) => { console.log('first', x); });
let secondSub;
setTimeout(() => {
  secondSub = source.subscribe((x) => { console.log('second', x); });
}, 1500);
setTimeout(() => {
  firstSub.unsubscribe();
}, 4500);
setTimeout(() => {
  secondSub.unsubscribe();
}, 5500);

<script src="https://unpkg.com/rxjs@rc/bundles/rxjs.umd.min.js"></script>

打字稿版本:

import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';

export const tapRefCount = (
  onChange: (refCount: number, prevRefCount: number) => void
) => <T>(source: Observable<T>): Observable<T> => {
  let refCount = 0;

  // mute the operator if it has nothing to do
  if (typeof onChange !== 'function') {
    return source;
  }

  // mute errors from side-effects
  const safeOnChange = (refCount, prevRefCount) => {
      try {
        onChange(refCount, prevRefCount);
      } catch (e) {
    }
  };

  // spy on subscribe
  return Observable.create((observer: Observer<T>) => {
    const subscription = source.subscribe(observer);
    const prevRefCount = refCount;
    refCount++;
    safeOnChange(refCount, prevRefCount);

    // spy on unsubscribe
    return () => {
      subscription.unsubscribe();
      const prevRefCount = refCount;
      refCount--;
      safeOnChange(refCount, prevRefCount);
    };
  }) as Observable<T>;
};

这篇关于检查 publishReplay().refCount() 是否有观察者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆