Akka HTTP连接池在几个小时后挂起 [英] Akka HTTP Connection Pool Hangs After Couple of Hours

查看:119
本文介绍了Akka HTTP连接池在几个小时后挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个运行了几个小时后挂起的HTTP连接池:

I have an HTTP Connection Pool that hangs after a couple of hours of running:

private def createHttpPool(host: String): SourceQueue[(HttpRequest, Promise[HttpResponse])] = {
    val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host)
    Source.queue[(HttpRequest, Promise[HttpResponse])](config.poolBuffer, OverflowStrategy.dropNew)
      .via(pool).toMat(Sink.foreach {
        case ((Success(res), p)) => p.success(res)
        case ((Failure(e), p)) => p.failure(e)
      })(Keep.left).run
  }

我将以下项排队:

private def enqueue(uri: Uri): Future[HttpResponse] = {
    val promise = Promise[HttpResponse]
    val request = HttpRequest(uri = uri) -> promise

    queue.offer(request).flatMap {
      case Enqueued => promise.future
      case _ => Future.failed(ConnectionPoolDroppedRequest)
    }
}

并像这样解决响应

private def request(uri: Uri): Future[HttpResponse] = {
    def retry = {
      Thread.sleep(config.dispatcherRetryInterval)
      logger.info(s"retrying")
      request(uri)
    }

    logger.info("req-start")
    for {
      response <- enqueue(uri)

      _ = logger.info("req-end")

      finalResponse <- response.status match {
        case TooManyRequests => retry
        case OK => Future.successful(response)
        case _ => response.entity.toStrict(10.seconds).map(s => throw Error(s.toString, uri.toString))
      }
    } yield finalResponse
}

然后,如果Future成功,则始终转换此函数的结果:

The result of this function is then always transformed if the Future is successful:

def get(uri: Uri): Future[Try[JValue]] = {
  for {
    response <- request(uri)
    json <- Unmarshal(response.entity).to[Try[JValue]]
  } yield json
}

一会儿一切正常,然后我在日志中看到的全部都是req-start,没有req-end。

Everything works fine for a while and then all I see in the logs are req-start and no req-end.

我的akka​​配置是这样的:

My akka configuration is like this:

akka {
  actor.deployment.default {
    dispatcher = "my-dispatcher"
  }
}

my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"

  fork-join-executor {
    parallelism-min = 256
    parallelism-factor = 128.0
    parallelism-max = 1024
  }
}

akka.http {
  host-connection-pool {
    max-connections = 512
    max-retries = 5
    max-open-requests = 16384
    pipelining-limit = 1
  }
}

我不确定这是配置问题还是代码问题。我的并行性和连接数如此之高,因为没有它,我的请求/速率就会很差(我想尽可能快地请求-我还有其他速率限制代码来保护服务器)。

I'm not sure if this is a configuration problem or a code problem. I have my parallelism and connection numbers so high because without it I get very poor req/s rate (I want to request as fast possible - I have other rate limiting code to protect the server).

推荐答案

您没有使用从服务器获取的响应的实体。引用以下文档:

You are not consuming the entity of the responses you get back from the server. Citing the docs below:


使用(或丢弃)请求的实体是强制性的!如果
意外遗留了既未使用也未丢弃的Akka HTTP将假定
传入数据应保持反压,并将通过TCP背压机制停止
传入数据。不管HttpResponse的状态如何,客户端应
消费该实体。

Consuming (or discarding) the Entity of a request is mandatory! If accidentally left neither consumed or discarded Akka HTTP will assume the incoming data should remain back-pressured, and will stall the incoming data via TCP back-pressure mechanisms. A client should consume the Entity regardless of the status of the HttpResponse.

该实体以 Source [ByteString,_] ,该源需要运行以避免资源匮乏。

The entity comes in the form of a Source[ByteString, _] which needs to be run to avoid resource starvation.

如果您不需要读取实体,消耗实体字节的最简单方法是通过使用

If you don't need to read the entity, the simplest way to consume the entity bytes is to discard them, by using

res.discardEntityBytes()

(您可以添加-例如- .future()。map来附加回调(...))。

(you can attach a callback by adding - e.g. - .future().map(...)).

文档的此页面介绍了所有替代方法,包括如何读取字节(如果有)

This page in the docs describes all the alternatives to this, including how to read the bytes if needed.

---编辑

在提供了更多代码/信息之后,很显然该资源消费不是问题。此实现中还有另一个大的危险信号,即retry方法中的 Thread.sleep
这是一个阻塞调用,很可能会使您的基本actor系统的线程基础设施饿死。

After more code/info was provided, it is clear that the resource consumption is not the problem. There is another big red flag in this implementation, namely the Thread.sleep in the retry method. This is a blocking call that is very likely to starve the threading infrastructure of your underlying actor system.

提供了为何如此危险的完整说明。在文档

A full blown explanation of why this is dangerous was provided in the docs.

尝试更改并使用 akka.pattern.after (< a href = http://doc.akka.io/docs/akka/current/scala/futures.html#After rel = nofollow noreferrer> docs )。下面的示例:

Try changing that and using akka.pattern.after (docs). Example below:

def retry = akka.pattern.after(200 millis, using = system.scheduler)(request(uri))

这篇关于Akka HTTP连接池在几个小时后挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆