超时后,ReactiveX 发出空值或哨兵值 [英] ReactiveX emit null or sentinel value after timeout

查看:33
本文介绍了超时后,ReactiveX 发出空值或哨兵值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

寻找一种干净的方法来转换源 Observable 以在一段时间内不发出项目后发出单个 null(或哨兵值).

Looking for a clean way to transform a source Observable to emit a single null (or sentinel value) after not emitting an item for some duration.

例如,如果源 observable 发出 1, 2, 3 然后在发出 4, 5, 6 之前停止发出 10 秒我希望发出的项目为 1, 2, 3, null, 4, 5, 6.

For example, if the source observable emits 1, 2, 3 then stops emitting for 10 seconds before emitting 4, 5, 6 I would like the emitted items to be 1, 2, 3, null, 4, 5, 6.

用例用于在 UI 中显示值,如果最后发出的值是陈旧的/旧的,则显示的值应该变成破折号 -N/A.

The use case is for displaying values in a UI where the displayed value should turn into a dash - or N/A if the last emitted value is stale/old.

我查看了 timeout 操作符,但是当超时发生时它会终止 Observable,这是不希望的.

I looked into the timeout operator but it terminates the Observable when the timeout occurs which is undesirable.

使用 RxJava.

推荐答案

基于 akarnokd 的回答类似问题的答案,另一种实现:

Based on akarnokd's answer and an answer in a similar question, an alternative implementation:

如果您正在寻找一个值来表示两次排放之间的时间间隔:

If you're looking for a single value to indicate the lapse of time between emissions:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
    .concatWith(Observable.never())
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));

连续哨兵值

如果您希望在源 observable 在一段时间内不发射后继续接收值:

Continuous sentinel values

If you're looking to receive values continuously after the source observable not emitting for some duration:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
    .map(x -> -1)
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));

<小时>

区别在于 timeout observable 以及它是否重复发生.


The difference being the timeout observable and whether it's recurring or not.

您可以根据需要将 -1 替换为 null.

You can replace -1 with null as needed.

以上所有内容均使用 Java 1.8.0_72 使用 RxJava 1.0.17 进行测试.

All of the above is tested with RxJava 1.0.17 using Java 1.8.0_72.

这篇关于超时后,ReactiveX 发出空值或哨兵值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆