akka-stream + akka-http 生命周期 [英] akka-stream + akka-http lifecycle
问题描述
TLDR:当我将传出 http 请求作为流的一部分时,是按请求具体化一个流(即使用短期流)还是跨请求使用单个流具体化?
TLDR: is it better to materialize a stream per request (i.e. use short-lived streams) or to use a single stream materialization across requests, when I have an outgoing http request as a part of the stream?
详细信息:我有一个典型的服务,它接受一个 HTTP 请求,将其分散到几个 3rd 方下游服务(不受我控制)并在将结果发回之前聚合结果.我将 akka-http 用于客户端实现,并使用 Spray 作为服务器(传统,随着时间的推移将转移到 akka-http).示意图:
Details: I have a typical service that takes an HTTP request, scatters it to several 3rd party downstream services (not controlled by me) and aggregates the results before sending them back. I'm using akka-http for client implementation and spray for server (legacy, will move to akka-http over time). Schematically:
请求 ->地图 -1-*->地图 ->第 3 方 http ->地图-*-1>聚合 ->回复
这可以通过实现每个请求的流或实现(部分)流一次并跨请求共享来实现.
This can be achieved either by materializing a stream per request or materializing (parts of) stream once and share it across requests.
物化每个请求会产生物化开销1并且不清楚如何利用连接池.问题在here(许多实现可以耗尽池).我可以将池包装在一个长时间运行的 http 流中,例如 here 并包裹在 mapAsync
上游"中,但错误处理策略不是对我来说很清楚.当单个请求失败并且流终止时,它是否也会关闭池?此外,我似乎需要协调请求和响应,因为它们不是按顺序返回的.
Materializing per request incurs materialization overhead1 and it is not clear how to leverage connection pools with it. The problem is described here (many materializations can exhaust the pool). I can wrap a pool in a long-running http stream like here and wrap in a mapAsync
"upstream", but the error handling strategy is not clear to me. When a single request fails and the stream is terminated, would it take down the pool as well? More, it seems I will need to reconcile requests and responses since they are not returned in order.
// example of stream per request
val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
Flow[HttpRequest]
.map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
.via(connectionFlow)
.map { case (response, _) => response }
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.via(httpFlow)
.mapAsync(1) {
// response handling logic
}
.runWith(Sink.last)
})
// example of stream per request with long running http stream
// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]
val result = Range(1 to 5).foreach{ i => {
Source.single(i)
.map(HttpRequest(...))
.mapAsync(1)(queueRequest)
.mapAsync(1) {
// somehow reconcile request with response?
// response handling logic
}
.runWith(Sink.last)
})
跨请求共享流有一个类似的错误处理问题 - 似乎有一些故障模式可以在所有请求进行中降低该流.代码将类似于 主机级 API,但队列在整个流前面.
Sharing stream across requests has a similar issue of error handling - it seems that there are failure modes that can bring down that stream with all requests in-flight. The code will be similar to host level API, but with the queue fronting the whole stream.
在这种情况下,哪种方式更好?
Which way is better in this case?
我确实尝试实施这两种解决方案,但在实施的每个阶段都有很多设计选择,因此即使在正确"的道路上也很容易搞砸.
I did try to implement both solutions, but there are many design choices at every stage of implementation, so it seems easy to screw up even on a "right" path.
1虽然我认为可以忽略不计,这和akka-http服务器的运行方式是一样的.
1Although I believe it is negligible, and it is the same way akka-http server operates.
推荐答案
一般来说,最好使用单个连接 Flow
并通过该单个 Flow 分派您的所有请求.主要原因是因为新的实现实际上可能会导致每次形成新的Connection
(取决于您的连接池设置).
In general it is much better to use a single connection Flow
and dispatch all of your requests through that single Flow. The primary reason is due to the fact that a new materialization may actually result in a new Connection
being formed each time (depending on your connection pool settings).
你说得对,这会导致一些并发症:
You are correct that this results in a few complications:
Ordering:通过提供随机 UUID
作为元组中的第二个值您正在传递给连接流,您正在消除将请求与响应相关联的能力.元组中额外的 T
值可用作相关 ID",以了解您从 Flow 中获得的 HttpResponse
.在您的特定示例中,您可以使用您创建的 Range
中的初始 Int
:
Ordering: By providing a random UUID
as the 2nd value in the tuple
that you are passing to the connection flow you are eliminating your ability to correlate a request to a response. That extra T
value in the tuple can be used as a "correlation id" to know which HttpResponse
you are getting from the Flow. In your particular example you could use the initial Int
from the Range
you created:
val responseSource : Source[(Try[HttpResponse], Int), _] =
Source
.fromIterator( () => Iterator range (0,5) )
.map(i => HttpRequest(...) -> i)
.via(connectionFlow)
现在每个响应都带有原始 Int 值,您可以使用它来处理响应.
Now each response comes with the original Int value which you can use to process the response.
错误处理:您说单个请求失败并且流终止"是错误的.单个请求失败不一定会导致流失败.相反,您只需从连接流中获得一个 (Failure(exception), Int)
值.您现在知道哪个 Int 导致了失败,并且您从流程中获得了异常.
Error Handling: You are incorrect in stating "a single request fails and the stream is terminated". A single request failure DOES NOT necessarily result in the stream failing. Rather, you will simply get a (Failure(exception), Int)
value from the connection flow. You now know which Int caused the failure and you have the exception from the flow.
这篇关于akka-stream + akka-http 生命周期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!