使用Spray Client进行多个请求时的akka​​超时 [英] akka timeout when using spray client for multiple request

查看:146
本文介绍了使用Spray Client进行多个请求时的akka​​超时的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在akka 2.3.6中使用喷雾1.3.2. (akka仅用于喷雾).
我需要读取巨大的文件,并为每一行发出一个http请求.
我用迭代器逐行读取文件,并为每个项目提出请求. 它在某些行中成功运行,但有时会因以下原因而失败:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms].
我首先以为我已使服务超载,所以我将"spray.can.host-connector.max-connections"设置为1.它运行速度慢得多,但出现了相同的错误.

这里的代码:

Using spray 1.3.2 with akka 2.3.6. (akka is used only for spray).
I need to read huge files and for each line make a http request.
I read the files line by line with iterator, and for each item make the request. It run successfully for some of the lines but at some time it start to fail with:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://default/user/IO-HTTP#-35162984]] after [60000 ms].
I first thought I overloading the service, so I set the "spray.can.host-connector.max-connections" to 1. It run much slower but I got the same errors.

Here the code:

import spray.http.MediaTypes._
val EdnType = register(
MediaType.custom(
  mainType = "application",
  subType = "edn",
  compressible = true,
  binary = false,
  fileExtensions = Seq("edn")))

val pipeline = (
  addHeader("Accept", "application/json")
  ~> sendReceive
  ~> unmarshal[PipelineResponse])

def postData(data: String) = {
  val request = Post(pipelineUrl).withEntity(HttpEntity.apply(EdnType, data))
  val responseFuture: Future[PipelineResponse] = pipeline(request)
  responseFuture
}

dataLines.map { d =>
  val f = postData(d)
  f.onFailure { case e => println("Error - "+e)} // This is where the errors are display
  f.map { p => someMoreLogic(d, p) }
}

aggrigateResults(dataLines)

我这样做是因为我不需要全部数据,只需要一些聚合即可.

I do it in such way since I don't need the entire data, just some aggregations.

我该如何解决并使它完全异步?

How can I solve this and keep it entirely async?

推荐答案

Akka询问超时是通过firstCompletedOf实现的,因此计时器在初始化询问时启动.

Akka ask timeout is implemented via firstCompletedOf, so the timer starts when the ask is initialized.

您似乎正在做的是为每条线路生成一个Future(在地图过程中)-因此您的所有通话几乎都在同一时间执行.初始化期货时,超时开始计时,但是没有剩余的执行者线程供所有派生的演员进行工作.因此,请求超时.

What you seem to be doing, is spawning a Future for each line (during the map) - so all your calls execute nearly at the same time. The timeouts start counting when the futures are initialized, but there are no executor threads left for all the spawned actors to do their work. Hence the asks time out.

我建议不要使用一次全部"的处理方法,而是采用一种更灵活的方法-与使用迭代器或akka流类似: Github )

Instead of processing "all at once", I would suggest a more flexible approach - somewhat similar to using iteratees, or akka-streams: Work Pulling Pattern. (Github)

您提供已经作为Epic的迭代器.介绍Worker actor,它将执行调用&一些逻辑.如果生成N workers,则最多将同时处理N行(并且处理管道可能涉及多个步骤).这样,您可以确保您不会使执行程序过载,并且不会发生超时.

You provide the iterator that you already have as an Epic. Introduce a Worker actor, which will perform the call & some logic. If you spawn N workers then, there will be at most N lines being processed concurrently (and the processing pipeline may involve multiple steps). This way you can ensure that you are not overloading the executors, and the timeouts shouldn't happen.

这篇关于使用Spray Client进行多个请求时的akka​​超时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆