如何使用反压溢出策略创建Akka-Http客户端? [英] How to create akka-http client with backpressure overflow strategy?
问题描述
我有未定数量的Akka-http客户端流从http服务下载数据。我使用Akka-http主机级连接池,因为我希望自定义池,因为有长时间运行的请求通过它。
因为客户端的数量是未定义的和动态的,所以我不知道如何配置连接池(max-open-request/max-connections)。此外,我可能希望连接池较小(少于客户端数量),以不损害带宽。
因此,我想设置一个客户端流,以便对池的新连接和请求进行反向压力:
1.这是否意味着我将需要一个具体化的客户端流?
2.如何实现所需数量的客户端流,以便如果没有可用的连接(来自下游的需求),请求将被回压。
我的第一次尝试是Source.single pattern,但是此方法可能会超过max-open-request并抛出异常,因为它在每次向服务器发送请求时都会创建一个新的流实例。
我的第二次尝试是Source.Queue,此方法创建了一个所有请求都入队的流:但是,尽管SourceQueue的Overflow Strategy Backpressed不起作用,并且当它超过max-connection或max-open-request时,Akka-http抛出一个exception
我可以使用host-level streaming fashion实现反压吗 并使用MergeHub拥有一个客户端流并添加新请求?
这是我的解决方案:
private lazy val poolFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Http.HostConnectionPool] =
Http().cachedHostConnectionPool[Promise[HttpResponse]](host.split("http[s]?://").tail.head, port, connectionPoolSettings)
val ServerSink =
poolFlow.async.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
// Attach a MergeHub Source to the consumer. This will materialize to a
// corresponding Sink.
val runnableGraph: RunnableGraph[Sink[(HttpRequest, Promise[HttpResponse]), NotUsed]] =
MergeHub.source[(HttpRequest, Promise[HttpResponse])](perProducerBufferSize = 16).to(ServerSink)
val toConsumer: Sink[(HttpRequest, Promise[HttpResponse]), NotUsed] = runnableGraph.run()
protected[akkahttp] def executeRequest[T](httpRequest: HttpRequest, unmarshal: HttpResponse => Future[T]): Future[T] = {
val responsePromise = Promise[HttpResponse]()
Source.single((httpRequest -> responsePromise)).runWith(toConsumer)
responsePromise.future.flatMap(handleHttpResponse(_, unmarshal))
)
}
推荐答案
我希望我正确理解了您的问题,所以这是我的解决方案(改编自Akka Docs):
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.OverflowStrategy
import akka.stream.QueueOfferResult.Enqueued
import akka.stream.scaladsl.{Sink, Source, SourceQueueWithComplete}
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
class Service(implicit sys: ActorSystem, ec: ExecutionContext) {
private val maxOffers = 256
private val bufferSize = ConfigFactory.load().getInt("akka.http.host-connection-pool.max-open-requests") // default is 32
private val poolClientFlow = Http().cachedHostConnectionPool[Promise[HttpResponse]]("localhost", 7000)
private val queue: SourceQueueWithComplete[(HttpRequest, Promise[HttpResponse])] =
Source
.queue(bufferSize, OverflowStrategy.backpressure, maxOffers)
.via(poolClientFlow)
.to(Sink.foreach({
case (Success(r), p) => p.success(r)
case (Failure(e), p) => p.failure(e)
}))
.run()
def makeRequest: Future[String] = {
val response = Http().singleRequest(HttpRequest().withUri("http://localhost:7000/tommy"))
response.map(Unmarshal(_)).flatMap(_.to[String])
}
def makeRequestBackpressured: Future[String] = {
val promise = Promise[HttpResponse]()
val request = HttpRequest().withUri("/tommy")
val response = queue.offer(request -> promise).flatMap {
case Enqueued => promise.future
case other => Future.failed(new RuntimeException(s"Queue offer error: $other"))
}
response.map(Unmarshal(_)).flatMap(_.to[String])
}
}
此处makeRequest
将在32次并行查询后失败,但makeRequestBackpressured
将使用Source.queue
中配置的反压策略
这篇关于如何使用反压溢出策略创建Akka-Http客户端?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!