通过连接池发出HTTP请求时Akka Flow挂起 [英] Akka Flow hangs when making http requests via connection pool

查看:150
本文介绍了通过连接池发出HTTP请求时Akka Flow挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Akka 2.4.4,并尝试从Apache HttpAsyncClient迁移(未成功)。

I'm using Akka 2.4.4 and trying to move from Apache HttpAsyncClient (unsuccessfully).

下面是我在项目中使用的简化代码。

Below is simplified version of code that I use in my project.

问题是,如果我向该流程发送了1-3个以上的请求,它将挂起。经过六个小时的调试,到目前为止,我什至无法找到问题所在。我没有在 Decider 中看到异常,错误日志和事件。什么都没有:)

The problem is that it hangs if I send more than 1-3 requests to the flow. So far after 6 hours of debugging I couldn't even locate the problem. I don't see exceptions, error logs, events in Decider. NOTHING :)

我试图将 connection-timeout 的设置减小为1s,以为可能是在等待服务器的响应

I tried reducing connection-timeout setting to 1s thinking that maybe it's waiting for response from the server but it didn't help.

我在做什么错了?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

< a href = https://github.com/akka/akka/blob/master/akka-http-core/src/main/resources/reference.conf#L146-L150 rel = noreferrer>代理支持?似乎也不适合我。

推荐答案

您需要完全消耗响应的主体,以便连接可用于后续请求。如果您根本不关心响应实体,则可以将其排放到 Sink.ignore ,如下所示:

You need to consume the body of the response fully so that the connection is made available for subsequent requests. If you don't care about the response entity at all, then you can just drain it to a Sink.ignore, something like this:

resp.entity.dataBytes.runWith(Sink.ignore)

通过默认配置,使用主机连接池时,最大连接数设置为4。每个池都有其自己的队列,请求等待直到打开的连接之一变为可用为止。如果该队列超过32(默认配置,可以更改,必须为2的幂),那么您将开始看到故障。在您的情况下,您只执行10个请求,所以您没有达到该限制。但是,通过不使用响应实体,您不会释放连接,其他所有东西都只是排在后面,等待连接释放。

By the default config, when using a host connection pool, the max connections is set to 4. Each pool has it's own queue where requests wait until one of the open connections becomes available. If that queue ever goes over 32 (default config, can be changed, must be a power of 2) then yo will start seeing failures. In your case, you only do 10 requests, so you don't hit that limit. But by not consuming the response entity you don't free up the connection and everything else just queues in behind, waiting for the connections to free up.

这篇关于通过连接池发出HTTP请求时Akka Flow挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆