制作一个懒惰的、缓存的 observable,只执行一次源代码 [英] Making a lazy, cached observable that only execute the source once
问题描述
我正在尝试使用 rxjs observable 在应用程序的整个生命周期中委派但共享一项昂贵的工作.
I'm trying to use an rxjs observable to delegate, but share, a piece of expensive work across the lifetime of an application.
本质上,类似于:
var work$ = Observable.create((o) => {
const expensive = doSomethingExpensive();
o.next(expensive);
observer.complete();
})
.publishReplay(1)
.refCount();
现在,这工作正常并且完全符合我的要求,除了一件事:如果所有订阅者都取消订阅,那么当下一个订阅者订阅时,我的昂贵工作会再次发生.我想保留它.
Now, this works fine and does exactly what I want, except for one thing: if all subscribers unsubscribe, then when the next one subscribes, my expensive work happens again. I want to keep it.
现在,我可以使用一个主题,或者我可以删除 refCount() 并手动使用连接(并且永远不会断开连接).但这会使昂贵的工作在我连接的那一刻发生,而不是订阅者第一次尝试使用 work$.
now, I could use a subject, or I could remove the refCount() and use connect manually (and never disconnect). But that would make the expensive work happen the moment I connect, not the first time a subscriber tries to consume work$.
本质上,我想要一些类似于 refCount 的东西,它只查看要连接的第一个订阅,而永远不会断开连接.懒惰连接".
Essentially, I want something akin to refCount that only looks at the first subscription to connect, and never disconnect. A "lazy connect".
这种事情有可能吗?
推荐答案
publishReplay()
实际工作
它在内部创建了一个 ReplaySubject
并使其与 multicast
兼容.ReplaySubject
的最小重放值为 1 次发射.结果如下:
How does publishReplay()
actually work
It internally creates a ReplaySubject
and makes it multicast
compatible. The minimal replay value of ReplaySubject
is 1 emission. This results in the following:
- 第一次订阅将触发
publishReplay(1)
以在内部订阅源流并通过ReplaySubject
管道传输所有排放,从而有效地缓存最后一个 n(=1) 排放 - 如果在源仍处于活动状态时启动第二个订阅,
multicast()
会将我们连接到相同的replaySubject
,我们将收到所有下一个发射,直到源流完成. - 如果在源已经完成之后开始订阅,replaySubject 会缓存最后的 n 个发射,并且它只会在完成之前接收那些发射.
- First subscription will trigger the
publishReplay(1)
to internally subscribe to the source stream and pipe all emissions through theReplaySubject
, effectively caching the last n(=1) emissions - If a second subscription is started while the source is still active the
multicast()
will connect us to the samereplaySubject
and we will receive all next emissions until the source stream completes. - If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.
const source = Rx.Observable.from([1,2])
.mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
.do(null,null,() => console.log('source stream completed'))
.publishReplay(1)
.refCount();
// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));
// new subscription after the stream has completed already
setTimeout(() => {
source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
这篇关于制作一个懒惰的、缓存的 observable,只执行一次源代码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!