如何在 PlayFramework 中使用 Akka Streams SourceQueue [英] How to use an Akka Streams SourceQueue with PlayFramework

查看:18
本文介绍了如何在 PlayFramework 中使用 Akka Streams SourceQueue的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中.播放控制器需要一个 Source 才能使用 chunked 方法流式传输结果.
由于 Play 在幕后使用自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源将在被 chunked 方法使用之前被消耗(除非我使用跟随黑客).

I would like to use a SourceQueue to push elements dynamically into an Akka Stream source. Play controller needs a Source to be able to stream a result using the chuncked method.
As Play uses its own Akka Stream Sink under the hood, I can't materialize the source queue myself using a Sink because the source would be consumed before it's used by the chunked method (except if I use the following hack).

如果我使用反应流发布者预先实现源队列,我就能让它工作,但这是一种肮脏的黑客":

I'm able to make it work if I pre-materialize the source queue using a reactive-streams publisher, but it's a kind of 'dirty hack' :

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

是否有更简单的方法来将 Akka Streams SourceQueue 与 PlayFramework 结合使用?

Is there a simpler way to use an Akka Streams SourceQueue with PlayFramework?

谢谢

推荐答案

解决方案是在源上使用 mapMaterializedValue 以获得其队列物化的未来:

The solution is to use mapMaterializedValue on the source to get a future of its queue materialization :

def sourceQueueAction = Action {
    val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))

    futureQueue.map { queue =>
      Source.tick(0.second, 1.second, "tick")
        .runForeach (t => queue.offer(t))
    }
    Ok.chunked(queueSource)

  }

  //T is the source type, here String
  //M is the materialization type, here a SourceQueue[String]
  def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
    val p = Promise[M]
    val s = src.mapMaterializedValue { m =>
      p.trySuccess(m)
      m
    }
    (s, p.future)
  }

这篇关于如何在 PlayFramework 中使用 Akka Streams SourceQueue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆