Rxjava:takeUntil 跳过完成事件 [英] Rxjava : takeUntil skip complete event

查看:72
本文介绍了Rxjava:takeUntil 跳过完成事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TakeUntil
在第二个 Observable 发出项目或终止后丢弃任何由 Observable 发出的项目

TakeUntil
discard any items emitted by an Observable after a second Observable emits an item or terminates

在使用 takeUntil 操作符时,有没有办法跳过 complete 事件?

Is there a way to skip the complete event when using takeUntil operator ?

Observable<Long> publish = source.publish(
        multicast -> multicast.flatMapMaybe(
                o -> Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler)
                        .takeUntil(multicast)
                        .reduce(Long::sum)
        )
);

此图说明了上述代码的生成结果:

This diagram illustrates the generated result for the code above :

//events: --------x-----1----2---1---x-----3--0--------x-1---1----|  
//result: ---------------------------4-----------------3----------2  

我想要的是处理 43 事件与从 complete<生成的最后一个事件 2 不同source 的/code> 事件.

What I want is to process 4 and 3 events differently from the last event 2 which is generated from the complete event of the source.

推荐答案

这个 issue 是 64676462

This issue is a followup of 64676462

我的结果并不漂亮,但我认为它确实有效.您的用例不是那么容易解决,因为您不知道发出的值是最后一个",因为它是一个流.你不能发出最后"value 并且知道它是最后一个,因为emit 和onComplete 是两个原子操作.您将需要延迟发出最后一个"值并等待 onComplete 事件,以确保last"value 确实是最后一个值.

My result is not pretty, but I think it does work. Your use-case is not that easy to resolve, because you do not know that a emitted value is "last", becase it is a stream. You can not emit the "last" value and know, that it is the last, because the emit and the onComplete are two atomar operations. You would need to delay emitting the "last" value and wait for the onComplete event, in order to be sure, that the "last" value is really the last value.

你也可以作弊,就像我在这里做的:

You could also cheat a little bit, like I did here:

注意:当源 observable 和内部订阅的 observable 完成时,一个 flatMap 就完成了.因此,当源完成时,我们必须确保内部流也完成.这是通过使用另一个#takeUntil 实现的,它通过#materialize 侦听来自源的onComplete 事件.reduce 之后的 #takeUntil 确保不会向下游发出 reduce 值.最后,#switchIfEmpty 会将 onComplete 事件从last"转换为value 到另一个值,因为 observable 没有发出任何值.

NOTE: a flatMap completes, when the source observable and the inner subscribed observable complete. Therefore we have to make sure, that the inner stream also completes, when the source completes. This is achieved by using another #takeUntil, which listens to the onComplete-event from the source via #materialize. The #takeUntil after reduce makes sure, that the reduce value is not emitted downstream. At last #switchIfEmpty will convert the onComplete event from the "last" value to another value, because the observable did not emit any values.

注意:假设是,所有值都从一个线程发出同步.

NOTE: assumption is, that all values emit sync from one thread.

@Test
public void takeWhileReduce() {
    TestScheduler scheduler = new TestScheduler();
    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Long> publish = source.publish(
            multicast -> {
                return multicast.flatMap(
                        o -> {
                            return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
                                    .takeUntil(multicast)
                                    .reduce(Long::sum)
                                    .toObservable()
                                    // make sure, that the inner stream completes, when the outer stream completes.
                                    // takeUntil must be after reduce, because takeUntil will close the stream and therefore reduce
                                    // will push its value to the subscriber.
                                    .takeUntil(multicast.materialize().filter(Notification::isOnComplete))
                                    // when the upstream is closed, switch over to a fallback observable.
                                    // if you want special handling for the "LAST" value, just provide another fallback observable.
                                    .switchIfEmpty(Observable.just(Long.MAX_VALUE));
                        },
                        1);
            });

    TestObserver<Long> test = publish.test();

    source.onNext(42);

    scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);
    // assert - values emitted: 0,1,2,3
    test.assertValuesOnly(6L);

    // next value is flatMapped
    scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1,2
    test.assertValuesOnly(6L, 3L);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1
    test.assertValuesOnly(6L, 3L, 1L);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    // action - outer-stream completes
    source.onComplete();

    test.assertComplete().assertValues(6L, 3L, 1L, Long.MAX_VALUE);
}

这篇关于Rxjava:takeUntil 跳过完成事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆