如何创建可以在以后通过方法调用接收元素的Source? [英] How to create a Source that can receive elements later via a method call?
本文介绍了如何创建可以在以后通过方法调用接收元素的Source?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想创建一个 Source
,然后在其上推送元素,例如:
I would like to create a Source
and later push elements on it, like in:
val src = ... // create the Source here
// and then, do something like this
pushElement(x1, src)
pushElement(x2, src)
推荐的方法是什么?
谢谢!
推荐答案
可以通过三种方式实现:
There are three ways this can be achieved:
1。使用SourceQueue发布实现
您可以使用 Source.queue
将Flow变为 SourceQueue
:
You can use Source.queue
that materializes the Flow into a SourceQueue
:
case class Weather(zipCode : String, temperature : Double, raining : Boolean)
val bufferSize = 100
//if the buffer fills up then this strategy drops the oldest elements
//upon the arrival of a new element.
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val queue = Source.queue(bufferSize, overflowStrategy)
.filter(!_.raining)
.to(Sink foreach println)
.run() // in order to "keep" the queue Materialized value instead of the Sink's
queue offer Weather("02139", 32.0, true)
2。用Actor发布实现
有类似的问题和答案,要点是您将流具体化为ActorRef并将消息发送给该ref:
There is a similar question and answer here, the gist being that you materialize the stream as an ActorRef and send messages to that ref:
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println )
.run() // in order to "keep" the ref Materialized value instead of the Sink's
ref ! Weather("02139", 32.0, true)
3。类似地,您可以显式创建一个包含消息缓冲区的Actor,使用该Actor创建一个Source,然后将其发送给Actor
如答案此处:
object WeatherForwarder {
def props : Props = Props[WeatherForwarder]
}
//see provided link for example definition
class WeatherForwarder extends Actor {...}
val actorRef = actorSystem actorOf WeatherForwarder.props
//note the stream has not been instatiated yet
actorRef ! Weather("02139", 32.0, true)
//stream already has 1 Weather value to process which is sitting in the
//ActorRef's internal buffer
val stream = Source(ActorPublisher[Weather](actorRef)).runWith{...}
这篇关于如何创建可以在以后通过方法调用接收元素的Source?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文