RxJS:为什么内部 observable 首先触发? [英] RxJS: why is inner observable firing first?

查看:36
本文介绍了RxJS:为什么内部 observable 首先触发?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 RxJS 6 并且我有以下示例问题:

I am using RxJS 6 and I have the following example problem:

我们希望为指定的 bufferTime 缓冲元素,但如果在超过 bufferTime 的一段时间内没有发生任何事情,我们希望第一个元素立即触发.

We want to buffer elements for a specified bufferTime but if nothing happend for some amount of time greater than bufferTime we want the first element to fire immediately.

顺序:

[------bufferTime------]

Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]


Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]

这是让我到达那里的代码:

This is the code that gets me there:

source$.pipe(
  buffer(source$.pipe(
    throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
    delay(10) // <-- This here bugs me like crazy though!
  )
)

我的问题是关于 delay 运算符.当我省略它时,缓冲区会以空列表触发,因为 $source.pipe(throttleTime(...)) 比缓冲区步骤快.

My question is about the delay operator. When I omit it, the buffer fires with an empty list because the $source.pipe(throttleTime(...)) is faster than the buffer step.

没有延迟

[------bufferTime------]

Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]


Output over time:
[]------------------[1,2,3]--[]------------------[4,5,6]

有没有办法摆脱延迟?

推荐答案

问题是当新值到来时,观察者订阅流的顺序被保留:第一个订阅的将首先收到该值,然后下一个,依此类推.

The problem is that the order in which the observers subscribe to a stream is preserved when a new value comes: The first that subscribed will receive the value first, then the next one, and so on.

v6buffer会先订阅内层流,内层流会先订阅source$,然后它(buffer)会订阅source$> 排在第二位.澄清一下,当一个新值进来时,第一个接收到新值的将是内部 observable,因为它在 buffer 之前订阅了源.

In v6, buffer will first subscribe to the inner stream, which will subscribe to source$ first, and then it (buffer) will subscribe to source$ in second place. To clarify, when a new value comes in, the first one to receive the new value will be the inner observable, because it subscribed to the source before buffer did.

v7 中换掉了,所以它应该不需要任何额外的工作

In v7 that's swapped out, so it should work without any extra work

作为 v6 的更简洁的解决方案,有一个运算符可以帮助您:subscribeOn.有了这个,您可以指定在订阅操作员时使用的调度程序,在您的情况下,通过使用带有 asapScheduler 的微任务,它应该可以工作:

As a cleaner solution for v6, there's an operator that can help you: subscribeOn. With this you can specify a scheduler to use when subscribing to the operator, in your case by using a micro-task with asapScheduler it should work:

source$.pipe(
  buffer(source$.pipe(
    subscribeOn(asapScheduler),
    throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
  )
)

这样对源的内部订阅将在 缓冲区订阅后推迟.

This way the inner subscription to source will be postponed after the buffer has subscribed to it.

请注意,如果您要使用此实现创建自定义运算符,您可能希望使用重载 publish(multicasted$ => ...) 多播 source$,否则此代码段将对 source$ 进行 2 次订阅(可能会发送两次请求?)

Note that if you're creating a custom operator with this implementation, you might want to multicast the source$ with the overload publish(multicasted$ => ...), otherwise this snippet will make 2 subscriptions to source$ (potentially sending a request twice?)

这篇关于RxJS:为什么内部 observable 首先触发?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆