如果 observable 在 X 时间内没有发出值,则产生副作用 [英] Do side effect if observable has not emitted a value within X amount of time
问题描述
我正在研究一个用例,它要求如果 observable 在一定时间内没有发出值,那么我们应该做一些副作用.
I'm working on a use case that requires that if an observable has not emitted a value within a certain amount of time then we should do some side effect.
给出一个实际用例:
- 打开网络套接字连接
- 如果在 X 时间内没有发送/接收消息,则关闭网络套接字连接并通知用户
这需要在每个发出的值和初始订阅 observable 时启动一个计时器,然后在分配的时间后运行一些函数,或者直到发出一个计时器重置的值.我正在努力以 Rx 的方式做到这一点.任何帮助将不胜感激:)
This requires for a timer to be initiated on every emitted value and upon initial subscription of observable which will then run some function after the allotted time or until a value is emitted in which the timer resets. I'm struggling to do this the Rx way. Any help would be appreciated :)
推荐答案
debounceTime
是您正在寻找的运算符:它仅在特定超时内没有其他人跟随时才发出值.侦听 debounce
d 流的第一条消息将使您超时并清理您的 websocket 连接.如果您需要从流的打开开始超时,您可以简单地startWith
.具体:
debounceTime
is the operator you're looking for: it only emits a value if no others follow within a specific timeout. Listening for the first message of the debounce
d stream will let you time out and clean up your websocket connection. If you need to time out starting from the opening of the stream, you can simply startWith
. Concretely:
messages$.startWith(null)
.debounceTime(timeout)
.take(1)
.subscribe(() => { /* side effects */ });
<小时>
如果您希望在超时时完全结束消息流(例如,您在 onComplete 处理程序中进行清理),只需填入 debounceTime
进入takeUntil
:
if instead you're looking to end the a message stream entirely when it times out (e.g. you clean up in the onComplete handler), just cram debounceTime
into a takeUntil
:
messages$.takeUntil(
messages$.startWith(null)
.debounceTime(timeout)
).subscribe(timeout_observer);
使用 timeout_observable: Observer
包含您的 onComplete 清理.
With a timeout_observable: Observer<TMessage>
that contains your cleanup onComplete.
这篇关于如果 observable 在 X 时间内没有发出值,则产生副作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!