使用 akka-stream 两次使用源代码 [英] Use source twice with akka-stream

查看:27
本文介绍了使用 akka-stream 两次使用源代码的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将 Play 框架用于我构建的 Web 应用程序.Play 2.5 使用 Akka Stream API 来允许流式传输请求/响应.

I'm using the Play framework for a web application I built. Play 2.5 uses the Akka Stream API to allow streaming of request/response.

我有一个端点,可以将传入的文件直接流式传输到 Google 云端硬盘.

I have an endpoint where an incoming file is streamed directly to Google Drive.

我定义了一个看起来像这样的 BodyParser:

I define a BodyParser that looks like that:

BodyParser("toDrive") { request =>
  Accumulator.source[ByteString].mapFuture { source =>
    Future.successful(Right("Done"))
  }
}

我使用源 (Source[ByteString, _]) 并将其输入到一个 StreamedBody 中,我将其与提供的 WSClient 一起使用播放.

I use the source (Source[ByteString, _]) and feed it into a StreamedBody that I use with the WSClient provided by Play.

我想使用给定的 Source 并使用 WSClient 进行两个不同的 HTTP 调用.

I would like to use the given Source and use for two different HTTP call with the WSClient.

我通过将相同的 Source 传递给两个不同的 WSClient 调用来尝试这种幼稚的方法,但它失败了.我认为解决我的问题的方法是广播.

I tried the naive approach by passing the same Source into two different WSClient call, but it failed. I think the solution to my problem is broadcasting.

我想利用源代码创建 2 个源代码,供我的 WSClient 使用.

I want to take what's coming out of the source to create 2 sources to be used by my WSClient.

我还在玩SourceFlowSink.我正在努力理解这一切.

I'm still playing with Source, Flow and Sink. I'm trying to make sense of it all.

推荐答案

更新的解决方案:

  Accumulator[ByteString, Either[Result, String]] {
    val s1 = Sink
      .asPublisher[ByteString](fanout = false)
      .mapMaterializedValue(Source.fromPublisher)
      .mapMaterializedValue { source =>
        //do what you need with source
        Future.successful(Right("result 1"))
      }
    val s2 = Sink
      .asPublisher[ByteString](fanout = false)
      .mapMaterializedValue(Source.fromPublisher)
      .mapMaterializedValue { source =>
        //do what you need with source
        Future.successful(Right("result 2"))
      }

    def combine(val1: Future[Either[Result, String]],
                val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
      for {
        res1 <- val1
        res2 <- val2
      } yield {
        // do something with your result
        res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
      }
    }

    Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
      import GraphDSL.Implicits._

      val broadcast = b.add(Broadcast[ByteString](2))

      broadcast ~> sink
      broadcast ~> sink2

      SinkShape(broadcast.in)
    })
  }

做一点解释(AFAIK).我创建了 2 个水槽并将它们组合在一个水槽后面.Accumulator.apply 需要 1 个 Sink[E, Future[A]].BodyParser 迫使我使用 ByteString 作为 E 这是输入的数据的 type 水槽.

To give a little explanation (AFAIK). I create 2 sink and combine them behind a single one. The Accumulator.apply needs 1 Sink[E, Future[A]]. The BodyParser forces me to use ByteString as E which is the the type of data that goes in the sink.

所以 2 个接收器接收 ByteString 并具体化为 Future[String].我将Sink 转换为Source,因为我使用的API (WsClient) 可以将Source 作为主体.这个 API 给了我一个 Future[HttpResponse](为了解决方案,我把它简化为一个 Future[String] 但你可以做任何你想做的事情

So 2 sinks that takes ByteString in and materialize as a Future[String]. I convert the Sink as a Source because the API I use (WsClient) can take a Source as a body. This API gives me a Future[HttpResponse] (for the sake of the solution, I've simplified this to a Future[String] but you could do whatever you want in there.

现在这是 akka-streams API 发挥作用的地方.我强烈建议您查看 文档 以获得更好的理解.话虽如此,在这里,我使用 GraphDSL API 将我的 2 个接收器组合在一个接收器后面.任何进入暴露接收器的 ByteString 都会被发送到 2 个内部接收器.

Now this is where the akka-streams API comes into play. I strongly suggest that you look at the documentation to get a better understanding. With that said, here, I used the GraphDSL API to combine my 2 sink behind a single one. Any ByteString that comes into the exposed sink is sent into the 2 inner sinks.

注意:有一个方便的 Sink.combine 函数,它接受 n 个流并将它们组合在一个之后.但是使用这个函数意味着 丢失物化值(在本例中为Future[String])

Note: there is a convenient Sink.combine function that takes n streams and combine them behind one. But using this function means loosing the materialized value (in this case, Future[String])

下面提出的原始解决方案无法正常工作.它只是将数据发送到源之一.

Play Accumulator 也可以通过给它一个 Sink 来创建.

The Play Accumulator can also be created by giving it a Sink.

我使用了这种方法,到目前为止这似乎有效:

I used this approach and this seems to be working so far:

BodyParser("toDrive") { request =>
  def sourceToFut(src: Source): Future[T] = ???

  Accumulator[ByteString, Either[Result, T]] {
    Sink
      .asPublisher[ByteString](fanout = true)
      .mapMaterializedValue(Source.fromPublisher)
      .mapMaterializedValue { source =>
        val upload1Fut = sourceToFut(source)
        val upload2Fut = sourceToFut(source)
        for {
          file1 <- upload1Fut
          file2 <- upload2Fut
        } yield {
          (file1, file2)
        }
      }
  }
} 

与我的初始方法相比,唯一有效的变化是我自己创建了 Sink 并允许 fanout 这样我就可以在两个不同的 中使用源两次>WSClient 调用.

The only effective changes this has compared to my initial approach is that I create the Sink myself and allow fanout so I can use the source twice in two different WSClient call.

你怎么看@expert?

What do you think @expert?

这篇关于使用 akka-stream 两次使用源代码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆