使用 akka-stream 两次使用源代码 [英] Use source twice with akka-stream
问题描述
我正在将 Play 框架用于我构建的 Web 应用程序.Play 2.5 使用 Akka Stream API 来允许流式传输请求/响应.
I'm using the Play framework for a web application I built. Play 2.5 uses the Akka Stream API to allow streaming of request/response.
我有一个端点,可以将传入的文件直接流式传输到 Google 云端硬盘.
I have an endpoint where an incoming file is streamed directly to Google Drive.
我定义了一个看起来像这样的 BodyParser
:
I define a BodyParser
that looks like that:
BodyParser("toDrive") { request =>
Accumulator.source[ByteString].mapFuture { source =>
Future.successful(Right("Done"))
}
}
我使用源 (Source[ByteString, _]
) 并将其输入到一个 StreamedBody
中,我将其与提供的 WSClient
一起使用播放.
I use the source (Source[ByteString, _]
) and feed it into a StreamedBody
that I use with the WSClient
provided by Play.
我想使用给定的 Source
并使用 WSClient
进行两个不同的 HTTP 调用.
I would like to use the given Source
and use for two different HTTP call with the WSClient
.
我通过将相同的 Source
传递给两个不同的 WSClient
调用来尝试这种幼稚的方法,但它失败了.我认为解决我的问题的方法是广播.
I tried the naive approach by passing the same Source
into two different WSClient
call, but it failed. I think the solution to my problem is broadcasting.
我想利用源代码创建 2 个源代码,供我的 WSClient
使用.
I want to take what's coming out of the source to create 2 sources to be used by my WSClient
.
我还在玩Source
、Flow
和Sink
.我正在努力理解这一切.
I'm still playing with Source
, Flow
and Sink
. I'm trying to make sense of it all.
推荐答案
更新的解决方案:
Accumulator[ByteString, Either[Result, String]] {
val s1 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 1"))
}
val s2 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 2"))
}
def combine(val1: Future[Either[Result, String]],
val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
for {
res1 <- val1
res2 <- val2
} yield {
// do something with your result
res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
}
}
Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[ByteString](2))
broadcast ~> sink
broadcast ~> sink2
SinkShape(broadcast.in)
})
}
做一点解释(AFAIK).我创建了 2 个水槽并将它们组合在一个水槽后面.Accumulator.apply
需要 1 个 Sink[E, Future[A]]
.BodyParser
迫使我使用 ByteString
作为 E
这是输入的数据的 type
水槽.
To give a little explanation (AFAIK). I create 2 sink and combine them behind a single one. The Accumulator.apply
needs 1 Sink[E, Future[A]]
. The BodyParser
forces me to use ByteString
as E
which is the the type
of data that goes in the sink.
所以 2 个接收器接收 ByteString
并具体化为 Future[String]
.我将Sink
转换为Source
,因为我使用的API (WsClient) 可以将Source
作为主体.这个 API 给了我一个 Future[HttpResponse]
(为了解决方案,我把它简化为一个 Future[String]
但你可以做任何你想做的事情
So 2 sinks that takes ByteString
in and materialize as a Future[String]
. I convert the Sink
as a Source
because the API I use (WsClient) can take a Source
as a body. This API gives me a Future[HttpResponse]
(for the sake of the solution, I've simplified this to a Future[String]
but you could do whatever you want in there.
现在这是 akka-streams
API 发挥作用的地方.我强烈建议您查看 文档 以获得更好的理解.话虽如此,在这里,我使用 GraphDSL API 将我的 2 个接收器组合在一个接收器后面.任何进入暴露接收器的 ByteString
都会被发送到 2 个内部接收器.
Now this is where the akka-streams
API comes into play. I strongly suggest that you look at the documentation to get a better understanding. With that said, here, I used the GraphDSL API to combine my 2 sink behind a single one. Any ByteString
that comes into the exposed sink is sent into the 2 inner sinks.
注意:有一个方便的 Sink.combine
函数,它接受 n
个流并将它们组合在一个之后.但是使用这个函数意味着 丢失物化值(在本例中为Future[String]
)
Note: there is a convenient Sink.combine
function that takes n
streams and combine them behind one. But using this function means loosing the materialized value (in this case, Future[String]
)
下面提出的原始解决方案无法正常工作.它只是将数据发送到源之一.
Play Accumulator
也可以通过给它一个 Sink
来创建.
The Play Accumulator
can also be created by giving it a Sink
.
我使用了这种方法,到目前为止这似乎有效:
I used this approach and this seems to be working so far:
BodyParser("toDrive") { request =>
def sourceToFut(src: Source): Future[T] = ???
Accumulator[ByteString, Either[Result, T]] {
Sink
.asPublisher[ByteString](fanout = true)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
val upload1Fut = sourceToFut(source)
val upload2Fut = sourceToFut(source)
for {
file1 <- upload1Fut
file2 <- upload2Fut
} yield {
(file1, file2)
}
}
}
}
与我的初始方法相比,唯一有效的变化是我自己创建了 Sink
并允许 fanout
这样我就可以在两个不同的 中使用源两次>WSClient
调用.
The only effective changes this has compared to my initial approach is that I create the Sink
myself and allow fanout
so I can use the source twice in two different WSClient
call.
你怎么看@expert?
What do you think @expert?
这篇关于使用 akka-stream 两次使用源代码的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!