超时后,ReactiveX 发出空值或哨兵值 [英] ReactiveX emit null or sentinel value after timeout
问题描述
寻找一种干净的方法来转换源 Observable
以在一段时间内不发出项目后发出单个 null
(或哨兵值).
Looking for a clean way to transform a source Observable
to emit a single null
(or sentinel value) after not emitting an item for some duration.
例如,如果源 observable 发出 1, 2, 3
然后在发出 4, 5, 6
之前停止发出 10 秒我希望发出的项目为 1, 2, 3, null, 4, 5, 6
.
For example, if the source observable emits 1, 2, 3
then stops emitting for 10 seconds before emitting 4, 5, 6
I would like the emitted items to be 1, 2, 3, null, 4, 5, 6
.
用例用于在 UI 中显示值,如果最后发出的值是陈旧的/旧的,则显示的值应该变成破折号 -
或 N/A
.
The use case is for displaying values in a UI where the displayed value should turn into a dash -
or N/A
if the last emitted value is stale/old.
我查看了 timeout
操作符,但是当超时发生时它会终止 Observable
,这是不希望的.
I looked into the timeout
operator but it terminates the Observable
when the timeout occurs which is undesirable.
使用 RxJava.
推荐答案
基于 akarnokd 的回答 和 类似问题的答案,另一种实现:
Based on akarnokd's answer and an answer in a similar question, an alternative implementation:
如果您正在寻找一个值来表示两次排放之间的时间间隔:
If you're looking for a single value to indicate the lapse of time between emissions:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
.concatWith(Observable.never())
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));
连续哨兵值
如果您希望在源 observable 在一段时间内不发射后继续接收值:
Continuous sentinel values
If you're looking to receive values continuously after the source observable not emitting for some duration:
final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();
final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
.map(x -> -1)
.takeUntil(subject)
.repeat();
subject.mergeWith(timeout).subscribe(subscriber);
subject.onNext(1, 0);
subject.onNext(2, 100);
subject.onNext(3, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);
subject.onNext(4, 0);
subject.onNext(5, 100);
subject.onNext(6, 200);
scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));
<小时>
区别在于 timeout
observable 以及它是否重复发生.
The difference being the timeout
observable and whether it's recurring or not.
您可以根据需要将 -1
替换为 null
.
You can replace -1
with null
as needed.
以上所有内容均使用 Java 1.8.0_72
使用 RxJava 1.0.17 进行测试.
All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72
.
这篇关于超时后,ReactiveX 发出空值或哨兵值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!