如何将基于回调的API转换为基于Observable的API? [英] How to convert callback based API into one based on Observable?

查看:116
本文介绍了如何将基于回调的API转换为基于Observable的API?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用的库使用回调对象发出一系列消息对象。

The library I'm using emits a series of Message objects using callback object.

interface MessageCallback {
    onMessage(Message message);
}

使用一些 libraryObject.setCallback添加回调( MessageCallback)调用并使用非阻塞 libraryObject.start()方法调用启动该进程。

The callback is added using some libraryObject.setCallback(MessageCallback) call and the process is started using non-blocking libraryObject.start() method call.

创建 Observable< Message> 会发出这些对象的最佳方法是什么?

What is the best way of creating an Observable<Message> that will emit those objects?

如果 libraryObject.start()被阻止怎么办?

推荐答案

我认为你需要这样的东西(在scala中给出的例子)

I think you need something like this (example given in scala)

import rx.lang.scala.{Observable, Subscriber}

case class Message(message: String)

trait MessageCallback {
  def onMessage(message: Message)
}

object LibraryObject {
  def setCallback(callback: MessageCallback): Unit = {
    ???
  }

  def removeCallback(callback: MessageCallback): Unit = {
    ???
  }

  def start(): Unit = {
    ???
  }
}

def messagesSource: Observable[Message] =
  Observable((subscriber: Subscriber[Message]) ⇒ {
    val callback = new MessageCallback {
      def onMessage(message: Message) {
        subscriber.onNext(message)
      }
    }
    LibraryObject.setCallback(callback)
    subscriber.add {
      LibraryObject.removeCallback(callback)
    }
  })

至于阻塞/非阻塞 start():通常基于回调的体系结构将回调订阅和进程启动分开。在这种情况下,您可以完全独立于 start()进程创建任意数量的 messageSource s 。你决定是否分叉也是决定性的。你的架构与此不同吗?

As for the blocking/non-blocking start(): Usually callback-based architecture separates callback subscription and the process start. In that case, you can create as many messageSources as you want completely independently of when you start() the process. Also the decision whether you fork it or not is completely upon you. Is your architecture different from this?

你还应该以某种方式完成这个过程。最好的方法是在MessageCallback接口上添加一个 onCompleted 处理程序。如果要处理错误,还要添加 onError 处理程序。现在你看,你刚才宣布RxJava,一个的观察员: - )

You should also handle finishing the process somehow. The best would be to add an onCompleted handler to the MessageCallback interface. If you want to handle errors, also add an onError handler. Now behold, you have just declared the fundamental building stone of RxJava, an Observer :-)

这篇关于如何将基于回调的API转换为基于Observable的API?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆