如何在PlayFramework中使用Akka Streams SourceQueue [英] How to use an Akka Streams SourceQueue with PlayFramework
问题描述
我想使用SourceQueue将元素动态地推送到Akka Stream源中.
播放控制器需要使用Source才能使用chuncked
方法流式传输结果.
由于Play在引擎盖下使用其自己的Akka流接收器,因此我无法使用接收器来实现源队列,因为在chunked
方法使用源之前,源将被消耗掉(除非我使用以下技巧). /p>
如果我使用反应流发布者预先实现源队列,我就能使其工作,但这是一种肮脏的骇客":
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
是否有更简单的方法在PlayFramework中使用Akka Streams SourceQueue?
谢谢
解决方案是在源上使用mapMaterializedValue
以获得其队列实现的未来:
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
I would like to use a SourceQueue to push elements dynamically into an Akka Stream source.
Play controller needs a Source to be able to stream a result using the chuncked
method.
As Play uses its own Akka Stream Sink under the hood, I can't materialize the source queue myself using a Sink because the source would be consumed before it's used by the chunked
method (except if I use the following hack).
I'm able to make it work if I pre-materialize the source queue using a reactive-streams publisher, but it's a kind of 'dirty hack' :
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Is there a simpler way to use an Akka Streams SourceQueue with PlayFramework?
Thanks
The solution is to use mapMaterializedValue
on the source to get a future of its queue materialization :
def sourceQueueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
Source.tick(0.second, 1.second, "tick")
.runForeach (t => queue.offer(t))
}
Ok.chunked(queueSource)
}
//T is the source type, here String
//M is the materialization type, here a SourceQueue[String]
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
这篇关于如何在PlayFramework中使用Akka Streams SourceQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!