Akka Flow 在通过连接池发出 http 请求时挂起 [英] Akka Flow hangs when making http requests via connection pool

查看:29
本文介绍了Akka Flow 在通过连接池发出 http 请求时挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Akka 2.4.4 并尝试从 Apache HttpAsyncClient 迁移(未成功).

I'm using Akka 2.4.4 and trying to move from Apache HttpAsyncClient (unsuccessfully).

以下是我在项目中使用的代码的简化版本.

Below is simplified version of code that I use in my project.

问题是,如果我向流发送超过 1-3 个请求,它就会挂起.到目前为止,经过 6 个小时的调试,我什至无法找到问题所在.我在 Decider 中没有看到异常、错误日志、事件.什么都没有:)

The problem is that it hangs if I send more than 1-3 requests to the flow. So far after 6 hours of debugging I couldn't even locate the problem. I don't see exceptions, error logs, events in Decider. NOTHING :)

我尝试将 connection-timeout 设置减少到 1 秒,认为它可能正在等待服务器的响应,但没有帮助.

I tried reducing connection-timeout setting to 1s thinking that maybe it's waiting for response from the server but it didn't help.

我做错了什么?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

还有 代理支持 ?似乎也不适合我.

Also what's up with proxy support ? Doesn't seem to work for me either.

推荐答案

您需要完全使用响应的主体,以便连接可用于后续请求.如果你根本不关心响应实体,那么你可以把它排到一个Sink.ignore,像这样:

You need to consume the body of the response fully so that the connection is made available for subsequent requests. If you don't care about the response entity at all, then you can just drain it to a Sink.ignore, something like this:

resp.entity.dataBytes.runWith(Sink.ignore)

在默认配置下,当使用主机连接池时,最大连接数设置为 4.每个池都有自己的队列,请求会在其中等待一个打开的连接可用.如果该队列超过 32(默认配置,可以更改,必须是 2 的幂),那么您将开始看到失败.在您的情况下,您只执行 10 个请求,因此您不会达到该限制.但是,通过不使用响应实体,您不会释放连接,而其他一切只是在后面排队,等待连接释放.

By the default config, when using a host connection pool, the max connections is set to 4. Each pool has it's own queue where requests wait until one of the open connections becomes available. If that queue ever goes over 32 (default config, can be changed, must be a power of 2) then yo will start seeing failures. In your case, you only do 10 requests, so you don't hit that limit. But by not consuming the response entity you don't free up the connection and everything else just queues in behind, waiting for the connections to free up.

这篇关于Akka Flow 在通过连接池发出 http 请求时挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆