将回调方法实现转换为akka流Source [英] Converting a callback-method implementation into an akka stream Source
问题描述
我正在与我无法控制的Java库中的数据发布者合作。发布者库使用典型的回调设置。库代码中的某个位置(库是Java,但是我会在scala中描述简洁性):
I am working with a data publisher from a java library that I do not control. The publisher library uses a typical callback setup; somewhere in the library code (the library is java but I will describe in scala for terseness):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
该库的用户需要编写一个实现 onData
方法的类,并将该类传递给 DataProducer
,库代码如下所示:
The user of the library is required to write a class that implements the onData
method and pass that into a DataProducer
, the library code looks something like:
class DataProducer(consumer : DataConsumer) {...}
DataProducer
有自己的内部线程I每当有另一个要使用的 DataType
对象时,都无法控制并伴随数据缓冲区,即调用 onData
。
The DataProducer
has its own internal thread I cannot control, and accompanying data buffer, that is calling onData
whenever there is another DataType
object to consume.
所以,我的问题是:如何编写一个将原始库模式转换/转换为akka流的层源对象?
So, my question is: how do I write a layer that will convert/translate the original library pattern into an akka stream Source object?
谢谢。
推荐答案
有多种解决方法。一种是使用ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors ,您可以在其中更改回调以便向演员发送消息。根据回调的工作方式,您也许也可以使用mapAsync(将回调转换为Future)。只有在一个请求产生一个回调调用的情况下,这种方法才有效。
There are various ways this can be solved. One is to use an ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.
这篇关于将回调方法实现转换为akka流Source的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!