如何使用反压溢出策略创建Akka-Http客户端? [英] How to create akka-http client with backpressure overflow strategy?

查看:23
本文介绍了如何使用反压溢出策略创建Akka-Http客户端?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有未定数量的Akka-http客户端流从http服务下载数据。我使用Akka-http主机级连接池,因为我希望自定义池,因为有长时间运行的请求通过它。

因为客户端的数量是未定义的和动态的,所以我不知道如何配置连接池(max-open-request/max-connections)。此外,我可能希望连接池较小(少于客户端数量),以不损害带宽。

因此,我想设置一个客户端流,以便对池的新连接和请求进行反向压力:

1.这是否意味着我将需要一个具体化的客户端流?

2.如何实现所需数量的客户端流,以便如果没有可用的连接(来自下游的需求),请求将被回压。

我的第一次尝试是Source.single pattern,但是此方法可能会超过max-open-request并抛出异常,因为它在每次向服务器发送请求时都会创建一个新的流实例。

我的第二次尝试是Source.Queue,此方法创建了一个所有请求都入队的流:但是,尽管SourceQueue的Overflow Strategy Backpressed不起作用,并且当它超过max-connection或max-open-request时,Akka-http抛出一个exception

我可以使用host-level streaming fashion实现反压吗 并使用MergeHub拥有一个客户端流并添加新请求?

这是我的解决方案:

  private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] =
    Http().cachedHostConnectionPool[Promise[HttpResponse]](host.split("http[s]?://").tail.head, port, connectionPoolSettings)

  val ServerSink =
    poolFlow.async.toMat(Sink.foreach({
      case ((Success(resp), p)) => p.success(resp)
      case ((Failure(e), p)) => p.failure(e)
    }))(Keep.left)

  // Attach a MergeHub Source to the consumer. This will materialize to a
  // corresponding Sink.
  val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] =
  MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink)


  val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run()


  protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = {
    val responsePromise = Promise[HttpResponse]()
    Source.single((httpRequest -> responsePromise)).runWith(toConsumer)
    responsePromise.future.flatMap(handleHttpResponse(_, unmarshal))
    )
  }

推荐答案

我希望我正确理解了您的问题,所以这是我的解决方案(改编自Akka Docs):

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult.Enqueued
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

class Service(implicit sys: ActorSystem, ec: ExecutionContext) {
  private val maxOffers = 256
  private val bufferSize = ConfigFactory.load().getInt("akka.http.host-connection-pool.max-open-requests") // default is 32
  private val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("localhost", 7000)
  private val queue: SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] =
    Source
      .queue(bufferSize, OverflowStrategy.backpressure, maxOffers)
      .via(poolClientFlow)
      .to(Sink.foreach({
        case (Success(r), p) => p.success(r)
        case (Failure(e), p) => p.failure(e)
      }))
      .run()

  def makeRequest: Future[String] = {
    val response = Http().singleRequest(HttpRequest().withUri("http://localhost:7000/tommy"))
    response.map(Unmarshal(_)).flatMap(_.to[String])
  }

  def makeRequestBackpressured: Future[String] = {
    val promise = Promise[HttpResponse]()
    val request = HttpRequest().withUri("/tommy")

    val response = queue.offer(request -> promise).flatMap {
      case Enqueued => promise.future
      case other => Future.failed(new RuntimeException(s"Queue offer error: $other"))
    }

    response.map(Unmarshal(_)).flatMap(_.to[String])
  }
}

此处makeRequest将在32次并行查询后失败,但makeRequestBackpressured将使用Source.queue中配置的反压策略

这篇关于如何使用反压溢出策略创建Akka-Http客户端?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆