如何使用 Akka HTTP 从多个参与者/网络处理程序正确调用单个服务器? [英] How to properly call a single server from multiple actors / web handlers using Akka HTTP?

查看:26
本文介绍了如何使用 Akka HTTP 从多个参与者/网络处理程序正确调用单个服务器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个服务(我们称之为服务 A),它使用 Akka Server HTTP 来处理传入的请求.此外,我还有提供多种 Web 服务的第 3 方应用程序(服务 B).服务 A 的目的是转换客户端请求,调用服务 B 的一个或多个 Web 服务,合并/转换结果并将其返回给客户端.

I have a service (let's call it Service A) which uses Akka Server HTTP to handle incoming requests. Also I have 3rd party application (Service B) which provides several web services. The purpose of service A is to transform client requests, call one or multiple web services of service B, merge/transform results and serve it back to a client.

我在某些部分使用 Actors,而在其他部分使用 Future.为了调用服务 B,我使用 Akka HTTP 客户端.

I am using Actors for some parts, and just Future for other. To make a call to Service B, I use Akka HTTP client.

Http.get(actorSystem).singleRequest(HttpRequest.create()
        .withUri("http://127.0.0.1:8082/test"), materializer)
        .onComplete(...)

问题是,每个Service A请求都会创建一个新的流,如果有多个并发连接,会导致akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configure max-open-requests value[32] 错误

The issue is, a new flow is created per each Service A request, and if there are multiple concurrent connections, it results in akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] error

我已经问过这个问题并得到了使用单个 Flow 的建议 如何为多个(10k - 100k)请求正确调用Akka HTTP客户端?

I already asked this question and got a suggestion to use a single Flow How to properly call Akka HTTP client for multiple (10k - 100k) requests?

虽然它适用于来自一个地方的一批请求,但我不知道如何使用来自所有并发请求处理程序的单个 Flow.

While it works for a batch of requests coming from a single place, I don't know how to use a single Flow from all my concurrent request handlers.

正确的Akka 方式"是什么?

What is the correct "Akka-way" to do it?

推荐答案

我认为您可以使用 Source.queue 来缓冲您的请求.下面的代码假设您需要从 3rd 方服务获得答案,因此非常欢迎使用 Future[HttpResponse].通过这种方式,您还可以提供溢出策略来防止资源匮乏.

I think you could use Source.queue to buffer your requests. The code below assume that you need to get the answer from 3rd party service, so having a Future[HttpResponse] is very welcomed. This way you could also provide an overflow strategy to prevent resource starvation.

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}

import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.{Failure, Success}

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem("main")
implicit val materializer = ActorMaterializer()
val pool = Http().cachedHostConnectionPool[Promise[HttpResponse]](host = "google.com", port = 80)
val queue = Source.queue[(HttpRequest, Promise[HttpResponse])](10, OverflowStrategy.dropNew)
  .via(pool)
  .toMat(Sink.foreach({
    case ((Success(resp), p)) => p.success(resp)
    case ((Failure(e), p)) => p.failure(e)
  }))(Keep.left)
  .run
val promise = Promise[HttpResponse]
val request = HttpRequest(uri = "/") -> promise

val response = queue.offer(request).flatMap(buffered => {
  if (buffered) promise.future
  else Future.failed(new RuntimeException())
})

Await.ready(response, 3 seconds)

(代码复制自我的 博文)

这篇关于如何使用 Akka HTTP 从多个参与者/网络处理程序正确调用单个服务器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆