rxjs 5 发布重放引用计数 [英] rxjs 5 publishReplay refCount

查看:30
本文介绍了rxjs 5 发布重放引用计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我无法弄清楚 publishReplay().refCount() 是如何工作的.

I can't figure out how publishReplay().refCount() works.

例如(https://jsfiddle.net/7o3a45L1/):

var source = Rx.Observable.create(observer =>  {
  console.log("call"); 
  // expensive http request
  observer.next(5);
}).publishReplay().refCount();

subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
console.log(""); 

subscription2 = source.subscribe({next: (v) => console.log('observerB: ' + v)});
subscription2.unsubscribe();
console.log(""); 

subscription3 = source.subscribe({next: (v) => console.log('observerC: ' + v)});
subscription3.unsubscribe();
console.log(""); 

subscription4 = source.subscribe({next: (v) => console.log('observerD: ' + v)});
subscription4.unsubscribe();

给出以下结果:

调用观察者A:5

observerB: 5 调用observerB: 5

observerB: 5 call observerB: 5

observerC:5 观察者C:5 调用观察者C:5

observerC: 5 observerC: 5 call observerC: 5

观察者D:5观察者D:5观察者D:5调用观察者D:5

observerD: 5 observerD: 5 observerD: 5 call observerD: 5

1) 为什么observerB、C 和D 被多次调用?

1) Why are observerB, C and D called multiple times?

2) 为什么call"打印在每一行而不是行首?

2) Why "call" is printed on each line and not in the beginning of the line?

此外,如果我调用publishReplay(1).refCount(),它会分别调用observerB、C 和D 2 次.

Also, if i call publishReplay(1).refCount(), it calls observerB, C and D 2 times each.

我期望每个新观察者都只收到一次值 5 并且call"只打印一次.

What i expect is that every new observer receives the value 5 exactly once and "call" is printed only once.

推荐答案

publishReplay(x).refCount() combine 执行以下操作:

publishReplay(x).refCount() combined does the following:

  • 它创建了一个 ReplaySubject,它可以重放 x 个发射.如果 x 未定义,则它会重放完整的流.
  • 它使用 refCount() 运算符使此 ReplaySubject 多播兼容.这会导致并发订阅收到相同的排放量.
  • It create a ReplaySubject which replay up to x emissions. If x is not defined then it replays the complete stream.
  • It makes this ReplaySubject multicast compatible using a refCount() operator. This results in concurrent subscriptions receiving the same emissions.

您的示例包含一些问题,使它们如何协同工作变得模糊不清.请参阅以下修改后的片段:

Your example contains a few issues clouding how it all works together. See the following revised snippet:

var state = 5
var realSource = Rx.Observable.create(observer =>  {
  console.log("creating expensive HTTP-based emission"); 
  observer.next(state++);
//  observer.complete();
  
  return () => {
    console.log('unsubscribing from source')
  }
});


var source = Rx.Observable.of('')
  .do(() => console.log('stream subscribed'))
  .ignoreElements()
  .concat(realSource)
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount()
;
    
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
    
subscription3 = source.subscribe(v => console.log('observerC: ' + v));
subscription3.unsubscribe();
    
subscription4 = source.subscribe(v => console.log('observerD: ' + v));
 

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

运行此代码段时,我们可以清楚地看到它没有为 Observer D 发出重复的值,它实际上为每个订阅创建了新的排放.怎么来的?

When running this snippet we can see clearly that it is not emitting duplicate values for Observer D, it is in fact creating new emissions for every subscription. How come?

每次订阅都会在下一次订阅发生之前取消订阅.这有效地使 refCount 减少回零,没有进行多播.

Every subscription is unsubscribed before the next subscription takes place. This effectively makes the refCount decrease back to zero, no multicasting is being done.

问题在于 realSource 流没有完成.因为我们没有多播,所以下一个订阅者通过 ReplaySubject 获取 realSource 的新实例,并且新的排放在之前已经排放的排放之前.

The issue resides in the fact that the realSource stream does not complete. Because we are not multicasting the next subscriber gets a fresh instance of realSource through the ReplaySubject and the new emissions are prepended with the previous already emitted emissions.

因此,要修复流多次调用昂贵的 HTTP 请求,您必须完成流,以便 publishReplay 知道它不需要重新订阅.

So to fix your stream from invoking the expensive HTTP request multiple times you have to complete the stream so the publishReplay knows it does not need to re-subscribe.

这篇关于rxjs 5 发布重放引用计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆