如何在不丢失发出的项目的情况下暂停 Observable? [英] How can an Observable be paused without losing the items emitted?

查看:64
本文介绍了如何在不丢失发出的项目的情况下暂停 Observable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Observable 每秒发出一次滴答声:

I have an Observable that emits ticks every second:

Observable.interval(0, 1, TimeUnit.SECONDS)
    .take(durationInSeconds + 1));

我想暂停这个 Observable,让它停止发出数字,并按需恢复.

I'd like to pause this Observable so it stops emitting numbers, and resume it on demand.

有一些问题:

  • 根据Observable Javadoc,interval 操作符不支持背压
  • 关于 backpressure 的 RxJava 维基有一节关于调用堆栈阻塞作为背压的流量控制替代方案:
  • according to the Observable Javadoc, interval operator doesn't support backpressure
  • the RxJava wiki about backpressure has a section about Callstack blocking as a flow-control alternative to backpressure:

另一种处理过度生产的 Observable 的方法是阻塞调用堆栈(停放管理过度生产的 Observable 的线程).这有悖于反应性"和非阻塞模型的缺点接收.然而,如果有问题的 Observable 位于可以安全阻塞的线程上,这可能是一个可行的选择.目前 RxJava 没有公开任何操作符来促进这一点.

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the "reactive" and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

有没有办法暂停 interval Observable?或者我应该使用一些背压支持来实现我自己的滴答" Observable 吗?

Is there a way to pause an interval Observable? Or should I implement my own 'ticking' Observable with some backpressure support?

推荐答案

有很多方法可以做到这一点.例如,您仍然可以使用 interval() 并维护两个附加状态:比如布尔标志暂停"和一个计数器.

There are many ways of doing this. For example, you can still use interval() and maintain two additional states: say boolean flag "paused" and a counter.

public static final Observable<Long> pausableInterval(
  final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {

  final AtomicLong counter = new AtomicLong();
  return Observable.interval(initial, interval, unit, scheduler)
      .filter(tick -> !paused.get())
      .map(tick -> counter.getAndIncrement()); 
}

然后你只需在某处调用 paused.set(true/false) 来暂停/恢复

Then you just call paused.set(true/false) somewhere to pause/resume

编辑 2016-06-04

上面的解决方案有点问题.如果我们多次重用 observable 实例,它将从上次取消订阅时的值开始.例如:

There is a little problem with a solution above. If we reuse the observable instance multiple times, it will start from the value at the last unsubscribe. For example:

Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();

虽然 list1 应该是 [0,1,2,3,4],list2 实际上是 [5,6,7,8,9].如果不希望出现上述行为,则必须将 observable 设为无状态.这可以通过 scan() 运算符来实现.修改后的版本可以是这样的:

While list1 will be expected [0,1,2,3,4], list2 will be actually [5,6,7,8,9]. If the behavior above is not desired, the observable has to be made stateless. This can be achieved by scan() operator. The revised version can look like this:

  public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
      final long period, TimeUnit unit, Scheduler scheduler) {

    return Observable.interval(initialDelay, period, unit, scheduler)
        .filter(tick->!pause.get())
        .scan((acc,tick)->acc + 1);
  }

或者,如果您不希望依赖 Java 8 和 lambdas,您可以使用 Java 6+ 兼容代码执行以下操作:

Or if you do not wish dependency on Java 8 and lambdas you can do something like this using Java 6+ compatible code:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

这篇关于如何在不丢失发出的项目的情况下暂停 Observable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆