RxJS中的同步性 [英] Synchronicity in RxJS

查看:845
本文介绍了RxJS中的同步性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望以下代码可以异步运行:

I would expect that the following code would run asynchronously:

var range = Rx.Observable.range(0, 3000000);

range.subscribe(
  function(x) {},
  function(err) {},
  function() {
    console.log('Completed');
});

console.log('Hello World');

但事实并非如此。需要一段时间才能完成大范围的数字,只有当它完成后才能恢复执行,你可以尝试代码这里

But that's not the case. It takes a while to go through the big range of numbers and only when it is completed the execution is resumed, you can try the code here.

我很困惑何时期望RxJS同步或异步运行。它取决于使用的方法吗?我之前的想法是,一旦我们进入Observables / Observer土地,其中的所有内容都是异步运行的,类似于承诺的工作方式。

I am confused as to when to expect RxJS to behave synchronously or asynchronously. Does it depend on the method used? My previous idea was that once we are in Observables/Observer land, everything in it runs asynchronously, similar to how promises work.

推荐答案

RxJ遵循与Rx.Net相同的规则。默认情况下,每个可观察操作符使用执行其工作所需的最小异步数量。在这种情况下, Range 可以同步运行数字,因此它可以(其文档告诉你它将使用 Rx.Scheduler.currentThread by默认。

RxJs follows the same rules as Rx.Net. By default, each observable operator uses the minimum amount of asynchronicity needed to do its work. In this case, Range can run through the numbers synchronously and so it does (its documentation tells you it will use the Rx.Scheduler.currentThread by default.

如果你想引入比操作所需更多的异步性,你需要告诉它使用不同的 Scheduler

If you want to introduce more asynchronicity than is needed for an operation, you need to tell it to use a different Scheduler.

为了获得你的行为期待,你想使用 Rx.Scheduler.timeout 。这实质上会导致它通过 setTimeout 。(实际上并非如此简单,调度程序将使用浏览器中可用的最快方法来安排延期工作)。

To get the behavior you were expecting, you want to use Rx.Scheduler.timeout. This will, in essence, cause it to schedule each iteration via setTimeout. (In actuality it is not this simple, the scheduler will use the fastest method available in the browser to schedule deferred work).

var range = Rx.Observable.range(0, 3000000, Rx.Scheduler.timeout);

更新了jsFiddle

请注意,通过 setTimeout 迭代300万个数字将几乎永远。所以也许我们想要分批处理它们。所以在这里我们将利用 Range 的默认行为来同步运行,然后批量处理并使用 observeOn 来通过我们的超时调度程序运行批处理:

Note that iterating through 3 million numbers via setTimeout will take nearly forever. So maybe we want to process them in batches of 1,000. So here we will take advantage of the default behavior of Range to run synchronously, then batch the values and use observeOn to run the batches via our timeout scheduler:

var range = Rx.Observable
    .range(0, 3000000)
    .bufferWithCount(1000)
    .observeOn(Rx.Scheduler.timeout) // run each buffer via setTimeout
    .select(function (buffer, i) {
       console.log("processing buffer", i);
       return Rx.Observable.fromArray(buffer);
     })
    .concatAll(); // concat the buffers together

jsFiddle 请注意,开头有一个延迟,而范围曲解所有3,000,000个值并且 bufferWithCount 产生3,000个阵列。对于真正的生产代码来说,这种东西很不寻常,因为你的数据源并不像 Observable.range 那么简单。

jsFiddle Note that there is a delay at the beginning while range cranks through all 3,000,000 values and bufferWithCount produces 3,000 arrays. This sort of stuff is unusual for real production code where your data source is not as trivial as Observable.range.

在这方面,FYI的承诺没有任何不同。如果您对已经完成的承诺调用然后,那么然后函数可能会同步运行。所有Promises和Observable实际上都提供了一个接口,通过该接口,您可以提供在满足条件时保证运行的回调,条件是否已满足或稍后是否满足。然后,RxJs提供了许多机制来强制某些东西以异步方式运行,如果你真的想要那样的话。以及引入特定时间的方法。

FYI promises are not any different in this respect. If you call then on a promise that is already completed, that then function might run synchronously. All Promises and Observables really do is present an interface by which you can provide callbacks that are guaranteed to run when the condition is met, whether the condition is already met or will be met later. RxJs then provides many many mechanisms to force something to be run asynchronously if you really want it that way. And methods to introduce specific timings.

这篇关于RxJS中的同步性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆