制作一个懒惰的、缓存的 observable,只执行一次源代码 [英] Making a lazy, cached observable that only execute the source once

查看:24
本文介绍了制作一个懒惰的、缓存的 observable,只执行一次源代码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 rxjs observable 在应用程序的整个生命周期中委派但共享一项昂贵的工作.

I'm trying to use an rxjs observable to delegate, but share, a piece of expensive work across the lifetime of an application.

本质上,类似于:

var work$ = Observable.create((o) => {
  const expensive = doSomethingExpensive();
  o.next(expensive);
  observer.complete();
})
.publishReplay(1)
.refCount();

现在,这工作正常并且完全符合我的要求,除了一件事:如果所有订阅者都取消订阅,那么当下一个订阅者订阅时,我的昂贵工作会再次发生.我想保留它.

Now, this works fine and does exactly what I want, except for one thing: if all subscribers unsubscribe, then when the next one subscribes, my expensive work happens again. I want to keep it.

现在,我可以使用一个主题,或者我可以删除 refCount() 并手动使用连接(并且永远不会断开连接).但这会使昂贵的工作在我连接的那一刻发生,而不是订阅者第一次尝试使用 work$.

now, I could use a subject, or I could remove the refCount() and use connect manually (and never disconnect). But that would make the expensive work happen the moment I connect, not the first time a subscriber tries to consume work$.

本质上,我想要一些类似于 refCount 的东西,它只查看要连接的第一个订阅,而永远不会断开连接.懒惰连接".

Essentially, I want something akin to refCount that only looks at the first subscription to connect, and never disconnect. A "lazy connect".

这种事情有可能吗?

推荐答案

publishReplay() 实际工作

它在内部创建了一个 ReplaySubject 并使其与 multicast 兼容.ReplaySubject 的最小重放值为 1 次发射.结果如下:

How does publishReplay() actually work

It internally creates a ReplaySubject and makes it multicast compatible. The minimal replay value of ReplaySubject is 1 emission. This results in the following:

  • 第一次订阅将触发 publishReplay(1) 以在内部订阅源流并通过 ReplaySubject 管道传输所有排放,从而有效地缓存最后一个 n(=1) 排放
  • 如果在源仍处于活动状态时启动第二个订阅,multicast() 会将我们连接到相同的 replaySubject,我们将收到所有下一个发射,直到源流完成.
  • 如果在源已经完成之后开始订阅,replaySubject 会缓存最后的 n 个发射,并且它只会在完成之前接收那些发射.
  • First subscription will trigger the publishReplay(1) to internally subscribe to the source stream and pipe all emissions through the ReplaySubject, effectively caching the last n(=1) emissions
  • If a second subscription is started while the source is still active the multicast() will connect us to the same replaySubject and we will receive all next emissions until the source stream completes.
  • If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.

const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

这篇关于制作一个懒惰的、缓存的 observable,只执行一次源代码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆