检查 publishReplay().refCount() 是否有观察者 [英] Check if publishReplay().refCount() has observers or not
问题描述
我像这样定义了一个 Observable:
I define an Observable like this:
const obs$ = Observable.create(...)
.publishReplay(1)
.refCount();
这样它就会在我的源 Observable 和所有观察者之间放置一个 ReplaySubject(1).
So that it puts a ReplaySubject(1) between my source Observable and all observers.
由于 ReplaySubject 在其状态中具有观察者的数量(通过其 observers
数组属性),如何从 obs$
访问 ReplaySubject?
Since ReplaySubject has in its state the number of observers (via its observers
array property), how is it possible to access the ReplaySubject from obs$
?
我实际上只需要知道 obs$
是否有任何观察者.RxJS4 在 Subject 上有一个 hasObservers() 方法,但它在 RxJS5 中被删除了.我如何使用 RxJS5 实现这一点?
I actually only need to know if obs$
has any observers or not. RxJS4 had a hasObservers() method on Subject, but it got removed in RxJS5. How can I achieve this with RxJS5?
推荐答案
不确定您的用法,但为了我的需要,我创建了一个自定义运算符,允许我根据状态透明地执行副作用(类似于点击)引用计数.它只是进行直通订阅,并对订阅/取消订阅进行鸭打.回调获取当前的 refCount 和前一个,以便您可以判断状态和方向.我喜欢为此使用运算符,因为我可以在流中的任何点插入它.如果您只是想要一个关于是否有任何订阅的二进制输出,可以很容易地修改.
Not sure about your usage but for my needs I created a custom operator that allowed me to transparently perform side-effects (similar to tap) based on the state of the refCount. It just does a pass-through subscription and duck-punches the sub/unsub. The callback gets the current refCount and the previous so that you can tell the state and direction. I like using an operator for this since I can insert it at any point in my stream. If you simply want a binary output for whether there are any subscriptions or not it could be easily modified for that.
const { Observable, Observer, interval } = rxjs;
const { publishReplay, refCount } = rxjs.operators;
const tapRefCount = (onChange) => (source) => {
let refCount = 0;
// mute the operator if it has nothing to do
if (typeof onChange !== 'function') {
return source;
}
// mute errors from side-effects
const safeOnChange = (refCount, prevRefCount) => {
try {
onChange(refCount, prevRefCount);
} catch (e) {
}
};
// spy on subscribe
return Observable.create((observer) => {
const subscription = source.subscribe(observer);
const prevRefCount = refCount;
refCount++;
safeOnChange(refCount, prevRefCount);
// spy on unsubscribe
return () => {
subscription.unsubscribe();
const prevRefCount = refCount;
refCount--;
safeOnChange(refCount, prevRefCount);
};
});
};
const source = interval(1000).pipe(
publishReplay(1),
refCount(),
tapRefCount((refCount, prevRefCount) => { console.log('refCount', refCount, prevRefCount > refCount ? 'down': 'up'); })
);
const firstSub = source.subscribe((x) => { console.log('first', x); });
let secondSub;
setTimeout(() => {
secondSub = source.subscribe((x) => { console.log('second', x); });
}, 1500);
setTimeout(() => {
firstSub.unsubscribe();
}, 4500);
setTimeout(() => {
secondSub.unsubscribe();
}, 5500);
<script src="https://unpkg.com/rxjs@rc/bundles/rxjs.umd.min.js"></script>
打字稿版本:
import { Observable } from 'rxjs/Observable';
import { Observer } from 'rxjs/Observer';
export const tapRefCount = (
onChange: (refCount: number, prevRefCount: number) => void
) => <T>(source: Observable<T>): Observable<T> => {
let refCount = 0;
// mute the operator if it has nothing to do
if (typeof onChange !== 'function') {
return source;
}
// mute errors from side-effects
const safeOnChange = (refCount, prevRefCount) => {
try {
onChange(refCount, prevRefCount);
} catch (e) {
}
};
// spy on subscribe
return Observable.create((observer: Observer<T>) => {
const subscription = source.subscribe(observer);
const prevRefCount = refCount;
refCount++;
safeOnChange(refCount, prevRefCount);
// spy on unsubscribe
return () => {
subscription.unsubscribe();
const prevRefCount = refCount;
refCount--;
safeOnChange(refCount, prevRefCount);
};
}) as Observable<T>;
};
这篇关于检查 publishReplay().refCount() 是否有观察者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!