RxJS:为什么内部 observable 首先触发? [英] RxJS: why is inner observable firing first?
问题描述
我正在使用 RxJS 6 并且我有以下示例问题:
I am using RxJS 6 and I have the following example problem:
我们希望为指定的 bufferTime
缓冲元素,但如果在超过 bufferTime
的一段时间内没有发生任何事情,我们希望第一个元素立即触发.
We want to buffer elements for a specified bufferTime
but if nothing happend for some amount of time greater than bufferTime
we want the first element to fire immediately.
顺序:
[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]
Output over time:
[1]-----------------[2,3]---[4]------------------[5,6]
这是让我到达那里的代码:
This is the code that gets me there:
source$.pipe(
buffer(source$.pipe(
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
delay(10) // <-- This here bugs me like crazy though!
)
)
我的问题是关于 delay
运算符.当我省略它时,缓冲区会以空列表触发,因为 $source.pipe(throttleTime(...))
比缓冲区步骤快.
My question is about the delay
operator. When I omit it, the buffer fires with an empty list because the $source.pipe(throttleTime(...))
is faster than the buffer step.
没有延迟
[------bufferTime------]
Input over time:
[1, 2, 3, -------------|---4, 5, 6 ----------------]
Output over time:
[]------------------[1,2,3]--[]------------------[4,5,6]
有没有办法摆脱延迟
?
推荐答案
问题是当新值到来时,观察者订阅流的顺序被保留:第一个订阅的将首先收到该值,然后下一个,依此类推.
The problem is that the order in which the observers subscribe to a stream is preserved when a new value comes: The first that subscribed will receive the value first, then the next one, and so on.
在 v6、buffer
会先订阅内层流,内层流会先订阅source$
,然后它(buffer)会订阅source$
> 排在第二位.澄清一下,当一个新值进来时,第一个接收到新值的将是内部 observable,因为它在 buffer
之前订阅了源.
In v6, buffer
will first subscribe to the inner stream, which will subscribe to source$
first, and then it (buffer) will subscribe to source$
in second place. To clarify, when a new value comes in, the first one to receive the new value will be the inner observable, because it subscribed to the source before buffer
did.
在 v7 中换掉了,所以它应该不需要任何额外的工作
In v7 that's swapped out, so it should work without any extra work
作为 v6 的更简洁的解决方案,有一个运算符可以帮助您:subscribeOn
.有了这个,您可以指定在订阅操作员时使用的调度程序,在您的情况下,通过使用带有 asapScheduler
的微任务,它应该可以工作:
As a cleaner solution for v6, there's an operator that can help you: subscribeOn
. With this you can specify a scheduler to use when subscribing to the operator, in your case by using a micro-task with asapScheduler
it should work:
source$.pipe(
buffer(source$.pipe(
subscribeOn(asapScheduler),
throttleTime(bufferTime, asyncScheduler, {leading: true, trailing: true}),
)
)
这样对源的内部订阅将在 缓冲区订阅后推迟.
This way the inner subscription to source will be postponed after the buffer has subscribed to it.
请注意,如果您要使用此实现创建自定义运算符,您可能希望使用重载 publish(multicasted$ => ...)
多播 source$,否则此代码段将对 source$
进行 2 次订阅(可能会发送两次请求?)
Note that if you're creating a custom operator with this implementation, you might want to multicast the source$ with the overload publish(multicasted$ => ...)
, otherwise this snippet will make 2 subscriptions to source$
(potentially sending a request twice?)
这篇关于RxJS:为什么内部 observable 首先触发?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!