如何在 RxJava (RxScala) 中实现observeLatestOn? [英] How to implement observeLatestOn in RxJava (RxScala)?

查看:45
本文介绍了如何在 RxJava (RxScala) 中实现observeLatestOn?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实施 ObserveLatestOn 运算符(实际上是 RxScala).

I'm trying to implement the ObserveLatestOn operator in RxJava (actually, RxScala).

这个操作符很有用,当我们有一个快速的生产者和一个慢速的订阅者,但订阅者不关心在消费一个项目时丢失的任何项目.

This operator is useful, when we've got a fast producer and a slow subscriber, but the subscriber doesn't care about any items lost while it was consuming an item.

大理石图:

--1---2---3----------5------6--7-8-9------|
--1=========>3===>---5=======>6======>9==>|

= 字符表示订阅者正在执行的长时间运行的工作,> 字符表示刚刚完成的工作.作为使用的规范示例,想象一个需要显示的数据的生产者,以及作为订阅者的数据的屏幕渲染器.渲染需要很长时间,但我们不需要在屏幕上渲染每一步,最后一步就完美了.

The = character represents a long-running work being performed by the subscriber, the > character represents the work just finishing. As the canonical example of usage imagine a producer of some data that need displaying, and a screen renderer of the data as a subscriber. The rendering takes quite long, but we don't need to render every step on the screen, just the last one is perfectly good.

在上面的弹珠图中,生产者发出信号1,订阅者开始处理,耗时较长.同时,生产者发出 2 和 3,之后订阅者才完成工作.它看到生产者发出的最后一项是 3,所以它开始处理它.这很快,同时没有新项目产生,因此订阅者可以休息.然后,5 到了,故事以同样的方式继续.

In the above marble diagram, the producer signals 1. The subscriber begins to process it, and it takes a long time. Meanwhile, the producer emits 2 and 3, and it isn't after that that the subscriber finishes the work. It sees that the last item emitted by the producer was 3, so it begins processing that. That is quick, no new item has been produced meanwhile, so the subscriber can rest. Then, 5 arrives and the story goes on in the same manner.

我花了几个小时试图实现这个看似简单的运算符,但我仍然不满意.运算符的本质表明它应该是一个异步的,它应该在与接收它们不同的调度程序上发出它的项目.但同时,我当然不希望一个线程在没有工作要做的情况下被一个worker占用.

I've spent hours trying to implement this seemingly simple operator, but I'm still not satisfied. The very nature of the operator indicates that it should be an asynchronous one, it should emit its items on a different scheduler than it receives them. But at the same time, of course I don't want to have a thread occupied by a worker while there is no work to be done.

这是我目前想到的:

def observeLatestOn[T](o: Observable[T], scheduler: Scheduler): Observable[T] = {
  @volatile var maybeNextItem: Option[Notification[T]] = None
  @volatile var isWorkScheduled = false
  val itemsQueueLock = new Object()

  Observable(subscriber ⇒ {
    def signalToSubscriber(materializedItem: Notification[T]): Unit = {
      materializedItem match {
        case OnNext(item) ⇒ subscriber onNext item
        case OnError(error) ⇒ subscriber onError error
        case OnCompleted ⇒ subscriber.onCompleted()
      }
    }

    def queueItem(item: Notification[T]): Unit = {
      val worker = scheduler.createWorker

      val shouldScheduleWork = itemsQueueLock synchronized {
        val result = !isWorkScheduled
        maybeNextItem = Some(item)
        isWorkScheduled = true
        result
      }

      if (shouldScheduleWork) {
        worker.scheduleRec {
          val maybeNextItemToSignal = itemsQueueLock synchronized {
            val result = maybeNextItem
            if (result.isEmpty) {
              worker.unsubscribe()
              isWorkScheduled = false
            }
            maybeNextItem = None
            result
          }

          maybeNextItemToSignal foreach signalToSubscriber
        }
      }
    }

    o.takeWhile(_ ⇒ !subscriber.isUnsubscribed).subscribe(
      next ⇒ queueItem(OnNext(next)),
      error ⇒ queueItem(OnError(error)),
      () ⇒ queueItem(OnCompleted)
    )
  })
}

它似乎有效,但我对没有竞争条件或死锁没有信心.另外,我不确定解决方案是否可以变得更简单.我也一直在考虑另一种方法,比如

It seems to work, but I don't feel confident that there are no race conditions or deadlocks. Also, I'm not sure if the solution could perhaps be made simpler. I've also been thinking of another approaches, like

  • 巧妙使用OperatorDebounceWithSelector
  • 一次只请求一个项目的 observable 的组合,observeOnonBackpressureBuffer(1)

我也不知道如何为此编写确定性单元测试.scheduleRec 调度的工作在与 TestScheduler 一起使用时不能被中断,我需要使用一个真正在不同线程上工作的调度程序.我发现很难为多线程代码的竞争条件编写正确的单元测试.

I also don't know how to write deterministic unit tests for this. The work scheduled by scheduleRec can't be interrupted when used with TestScheduler, I need to use a scheduler that really works on a different thread. I find it hard to write correct unit tests for race conditions of multi-threaded code.

所以,问题仍然存在:我的解决方案是否正确?有没有更简单、更好或更正确的方法来解决这个问题?以及如何测试它的正确性?

So, the question remains: Is my solution correct? Is there any simpler, better or more correct approach to this? And how to test it's correctness?

推荐答案

我推荐使用 lift 来实现这个操作符.这是我的解决方案:

I recommend that using lift to implement this operator. Here is my solution:

package object ObservableEx {

  implicit class ObserveLatestOn[T](val o: Observable[T]) {

    def observeLatestOn(scheduler: Scheduler): Observable[T] = {
      o.lift { (child: Subscriber[T]) =>
        val worker = scheduler.createWorker
        child.add(worker)

        val parent = new Subscriber[T] {

          private val lock = new AnyRef

          // protected by "lock"
          private var latest: Notification[T] = null
          // protected by "lock"
          // Means no task runs in the worker
          private var idle = true

          private var done = false

          override def onStart(): Unit = {
            request(Long.MaxValue)
          }

          override def onNext(v: T): Unit = {
            if (!done) {
              emit(OnNext(v))
            }
          }

          override def onCompleted(): Unit = {
            if (!done) {
              done = true
              emit(OnCompleted)
            }
          }

          override def onError(e: Throwable): Unit = {
            if (!done) {
              done = true
              emit(OnError(e))
            }
          }

          def emit(v: Notification[T]): Unit = {
            var shouldSchedule = false
            lock.synchronized {
              latest = v
              if (idle) {
                // worker is idle so we should schedule a task
                shouldSchedule = true
                // We will schedule a task, so the worker will be busy
                idle = false
              }
            }
            if (shouldSchedule) {
              worker.schedule {
                var n: Notification[T] = null
                var exit = false
                while (!exit) {
                  lock.synchronized {
                    if (latest == null) {
                      // No new item arrives and we are leaving the worker, so set "idle"
                      idle = true
                      exit = true
                    } else {
                      n = latest
                      latest = null
                    }
                  }
                  if (!exit) {
                    n.accept(child)
                  }
                }
              }
            }
          }
        }

        child.add(parent)

        parent
      }
    }
  }

}

还有一个单元测试

import ObservableEx.ObserveLatestOn

@Test
def testObserveLatestOn(): Unit = {
  val scheduler = TestScheduler()
  val xs = mutable.ArrayBuffer[Long]()
  var completed = false
  Observable.interval(100 milliseconds, scheduler).take(10).observeLatestOn(scheduler).subscribe(v => {
    scheduler.advanceTimeBy(200 milliseconds)
    xs += v
  },
    e => e.printStackTrace(),
    () => completed = true
  )
  scheduler.advanceTimeBy(100 milliseconds)
  assert(completed === true)
  assert(xs === List(0, 2, 4, 6, 8))
}

这篇关于如何在 RxJava (RxScala) 中实现observeLatestOn?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆