akka-stream + akka-http 生命周期 [英] akka-stream + akka-http lifecycle

查看:33
本文介绍了akka-stream + akka-http 生命周期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TLDR:当我将传出 http 请求作为流的一部分时,是按请求具体化一个流(即使用短期流)还是跨请求使用单个流具体化?

TLDR: is it better to materialize a stream per request (i.e. use short-lived streams) or to use a single stream materialization across requests, when I have an outgoing http request as a part of the stream?

详细信息:我有一个典型的服务,它接受一个 HTTP 请求,将其分散到几个 3rd 方下游服务(不受我控制)并在将结果发回之前聚合结果.我将 akka-http 用于客户端实现,并使用 Spray 作为服务器(传统,随着时间的推移将转移到 akka-http).示意图:

Details: I have a typical service that takes an HTTP request, scatters it to several 3rd party downstream services (not controlled by me) and aggregates the results before sending them back. I'm using akka-http for client implementation and spray for server (legacy, will move to akka-http over time). Schematically:

请求 ->地图 -1-*->地图 ->第 3 方 http ->地图-*-1>聚合 ->回复

这可以通过实现每个请求的流或实现(部分)流一次并跨请求共享来实现.

This can be achieved either by materializing a stream per request or materializing (parts of) stream once and share it across requests.

物化每个请求会产生物化开销1并且不清楚如何利用连接池.问题在here(许多实现可以耗尽池).我可以将池包装在一个长时间运行的 http 流中,例如 here 并包裹在 mapAsync 上游"中,但错误处理策略不是对我来说很清楚.当单个请求失败并且流终止时,它是否也会关闭池?此外,我似乎需要协调请求和响应,因为它们不是按顺序返回的.

Materializing per request incurs materialization overhead1 and it is not clear how to leverage connection pools with it. The problem is described here (many materializations can exhaust the pool). I can wrap a pool in a long-running http stream like here and wrap in a mapAsync "upstream", but the error handling strategy is not clear to me. When a single request fails and the stream is terminated, would it take down the pool as well? More, it seems I will need to reconcile requests and responses since they are not returned in order.

// example of stream per request

val connectionFlow = Http().cachedHostConnectionPool[UUID](host, port)
val httpFlow: Flow[HttpRequest, Try[HttpResponse], NotUsed] =
    Flow[HttpRequest]
      .map(req => req -> UUID.randomUUID()) // I don't care about id because it's a single request per stream.
      .via(connectionFlow)
      .map { case (response, _) => response }

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .via(httpFlow)
    .mapAsync(1) {
       // response handling logic
    }
    .runWith(Sink.last)
})


// example of stream per request with long running http stream

// as defined in http://doc.akka.io/docs/akka-http/current/scala/http/client-side/host-level.html#using-the-host-level-api-with-a-queue
def queueRequest(request: HttpRequest): Future[HttpResponse]

val result = Range(1 to 5).foreach{ i => {
  Source.single(i)
    .map(HttpRequest(...))
    .mapAsync(1)(queueRequest)
    .mapAsync(1) {
       // somehow reconcile request with response?
       // response handling logic
    }
    .runWith(Sink.last)
})

跨请求共享流有一个类似的错误处理问题 - 似乎有一些故障模式可以在所有请求进行中降低该流.代码将类似于 主机级 API,但队列在整个流前面.

Sharing stream across requests has a similar issue of error handling - it seems that there are failure modes that can bring down that stream with all requests in-flight. The code will be similar to host level API, but with the queue fronting the whole stream.

在这种情况下,哪种方式更好?

Which way is better in this case?

我确实尝试实施这两种解决方案,但在实施的每个阶段都有很多设计选择,因此即使在正确"的道路上也很容易搞砸.

I did try to implement both solutions, but there are many design choices at every stage of implementation, so it seems easy to screw up even on a "right" path.

1虽然我认为可以忽略不计,这和akka-http服务器的运行方式是一样的.

1Although I believe it is negligible, and it is the same way akka-http server operates.

推荐答案

一般来说,最好使用单个连接 Flow 并通过该单个 Flow 分派您的所有请求.主要原因是因为新的实现实际上可能会导致每次形成新的Connection(取决于您的连接池设置).

In general it is much better to use a single connection Flow and dispatch all of your requests through that single Flow. The primary reason is due to the fact that a new materialization may actually result in a new Connection being formed each time (depending on your connection pool settings).

你说得对,这会导致一些并发症:

You are correct that this results in a few complications:

Ordering:通过提供随机 UUID 作为元组中的第二个值您正在传递给连接流,您正在消除将请求与响应相关联的能力.元组中额外的 T 值可用作相关 ID",以了解您从 Flow 中获得的 HttpResponse.在您的特定示例中,您可以使用您创建的 Range 中的初始 Int:

Ordering: By providing a random UUID as the 2nd value in the tuple that you are passing to the connection flow you are eliminating your ability to correlate a request to a response. That extra T value in the tuple can be used as a "correlation id" to know which HttpResponse you are getting from the Flow. In your particular example you could use the initial Int from the Range you created:

val responseSource : Source[(Try[HttpResponse], Int), _] = 
  Source
    .fromIterator( () => Iterator range (0,5) )
    .map(i => HttpRequest(...) -> i)
    .via(connectionFlow)

现在每个响应都带有原始 Int 值,您可以使用它来处理响应.

Now each response comes with the original Int value which you can use to process the response.

错误处理:您说单个请求失败并且流终止"是错误的.单个请求失败不一定会导致流失败.相反,您只需从连接流中获得一个 (Failure(exception), Int) 值.您现在知道哪个 Int 导致了失败,并且您从流程中获得了异常.

Error Handling: You are incorrect in stating "a single request fails and the stream is terminated". A single request failure DOES NOT necessarily result in the stream failing. Rather, you will simply get a (Failure(exception), Int) value from the connection flow. You now know which Int caused the failure and you have the exception from the flow.

这篇关于akka-stream + akka-http 生命周期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆