如何在RXJS中传输异步流? [英] How are asynchronous streams transmitted in RXJS?

查看:80
本文介绍了如何在RXJS中传输异步流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解流是如何通过RXjs中的管道传输的。

我知道这不应该是一个问题,因为这是异步流的整个想法 - 但仍然有一些东西我希望了解。

I'm trying to understand how the stream is transmitted through the pipe in RXjs.
I know that this should not be a concern because that's the whole idea with async streams - but still there's something I want to understand.

查看此代码:

var source = Rx.Observable
    .range(1, 3)
    .flatMapLatest(function (x) {  //`switch` these days...
        return Rx.Observable.range(x*100, 2);
    });


 source.subscribe(value => console.log('I got a value ', value))

结果:

I got a value 100
I got a value 200
I got a value 300
I got a value 301

我相信(IIUC)图表是这样的:(通知标记为101,201,取消订阅)

I believe (IIUC) that the diagram is something like this : (notice striked 101,201 which are unsubscribed)

----1---------2------------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------(-̶1̶0̶1̶)-------200---(-̶2̶0̶1̶)-----300------301-------------

以下是问题:

问题:

是否始终保证2将在(101)之前到达?那个3是在(201)之前到达的吗?

Is it always guaranteed that 2 will arrive before the (101) ? same as that 3 is arriving before (201) ?

我的意思是 - 如果我不想看一个时间线,那么下面的图表是完全合法的发生:

I mean - if I'm not suppose to look at a time line so it is perfectly legal for the following diagram to occur :

----1---------------2---------------3------------------------------|

░░░░░░░░flatMapLatest(x=>Rx.Observable.range(x*100, 2))░░░░░░░░
-----100-------101------200---201-----300------301-------------

其中 2 稍微延迟到达,其中101是已经发出

Where 2 arrived with a slight delay where 101 was already emitted

我在这里缺少什么?管道如何工作?

What am I missing here? How does the pipe work here ?

推荐答案

对于具有此特定RxJS版本的特定Observable链,排放的顺序始终如一是相同的。

For this particular Observable chain with this particular RxJS version the order of emissions is going to always be the same.

如前所述,在RxJS 4中,它使用 currentThread 调度程序,如下所示: https://github.com/Reactive-扩展/ RxJS / blob / master / src / core / perf / operators / range.js#L39

所有调度程序(立即来自RxJS 4)内部使用某种类型的队列所以顺序总是相同的。

As already mentioned, in RxJS 4 it uses the currentThread scheduler as you can see here: https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/perf/operators/range.js#L39.
All schedulers (except the immediate from RxJS 4) are internally using some type of a queue so the order is always the same.

事件的顺序与你在图中显示的非常相似(.. 。或者至少我认为是):

The order of events is very similar to what you showed in the diagram (... or at least I think it is):


  1. 1 被安排和发出因为它是队列中的唯一动作。

  2. 100 已安排。此时,调度程序队列中没有其他操作,因为尚未安排 2 RangeObservable 在调用 onNext() 后递归调度另一个发射。这意味着 100 安排在 2 之前。

  3. 2 已安排

  4. 100 已发出, 101 已安排

  5. 2 已发出, 101 已被处置。

  6. ...等等

  1. 1 is scheduled and emitted because it's the only action in the queue.
  2. 100 is scheduled. At this point there are no more action in the Scheduler's queue because 2 hasn't been scheduled yet. The RangeObservable schedules another emission recursively after it calls onNext(). This means that 100 is scheduled before 2.
  3. 2 is scheduled.
  4. 100 is emitted, 101 is scheduled
  5. 2 is emitted, 101 is disposed.
  6. ... and so on

请注意,此行为在RxJS 4和RxJS 5中有所不同。

Note that this behavior is different in RxJS 4 and RxJS 5.

在RxJS 5中,大多数Observable和运算符默认情况下不使用任何Scheduler(一个明显的例外是Observables / operator需要处理延迟)。所以在 RxJS 5 RangeObservable 不会安排任何事情并立即开始在循环中发布值。

In RxJS 5 most Observables and operators by default don't use any Scheduler (an obvious exception are Observables/operator that need to work with delays). So in RxJS 5 the RangeObservable won't schedule anything and start emitting values right away in a loop.

RxJS 5中的相同示例将产生不同的结果结果:

The same example in RxJS 5 will produce different result:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2);
  });

source.subscribe(value => console.log('I got a value ', value));

这将打印以下内容:

I got a value  100
I got a value  101
I got a value  200
I got a value  201
I got a value  300
I got a value  301

但是,如果你添加例如延迟(0)。常识表明这不应该做任何事情:

However, this will change significantly if you add for example delay(0). The common sense suggests that this shouldn't do anything:

const source = Observable
  .range(1, 3)
  .switchMap(function (x) {
    return Observable.range(x * 100, 2).delay(0);
  });

source.subscribe(value => console.log('I got a value ', value));

现在只安排内部 RangeObservable 并且重复处理几次,这使得它只发出最后一个 RangeObservable 的值:

Now only the inner RangeObservable is scheduled and disposed all over again several times which makes it emit only values from the very the last RangeObservable:

I got a value  300
I got a value  301

这篇关于如何在RXJS中传输异步流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆