Akka HTTP:将来阻止会阻止服务器 [英] Akka HTTP: Blocking in a future blocks the server

查看:85
本文介绍了Akka HTTP:将来阻止会阻止服务器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Akka HTTP对我的请求进行基本身份验证。
碰巧我需要外部资源进行身份验证,因此我必须对该资源进行调用。



这需要一些时间,并且在处理过程中,似乎我的API其余部分已被阻止,正在等待此调用。
我用一个非常简单的示例复制了它:

  //二手调度程序:
隐式val系统= ActorSystem()
隐式val执行器= system.dispatcher
隐式val实现器= ActorMaterializer()


val路由=
(post&实体(as [String])){e =>
完成{
未来{
Thread.sleep(5000)
e
}
}
}〜
(get& path(Segment)){r =>
完成{
get
}
}

如果我发布到日志端点,则我的get端点也停留在等待日志端点规定的5秒钟。



这是预期的行为吗?是,如何在不阻塞我的整个API的情况下进行阻塞操作?

解决方案

您观察到的是预期的行为–当然,它是很坏。很好的是,存在已知的解决方案和最佳实践来防范它。在这个答案中,我想花点时间来解释这个问题,简短,冗长,然后再深入–享受阅读!



简短答案不要阻塞路由基础结构!,请始终使用专用的调度程序来阻塞操作!



原因观察到的症状:问题是您使用 context.dispatcher 作为执行期货的调度程序。路由基础结构使用同一调度程序(简单来说就是线程束)来实际处理传入的请求-因此,如果阻塞所有可用线程,最终将使路由基础结构饿死。 (有待辩论和基准测试的事情是,如果Akka HTTP可以防止这种情况发生,我将其添加到我的研究待办事项清单中。)



必须对阻塞进行处理尤其要注意不要影响同一调度程序的其他用户(这就是为什么我们使将执行分离到不同调度程序如此简单)的原因,如Akka docs部分中所述:



然后我们开始[b]加载,这会导致这些线程的阻塞–您可以看到早期的线程 default-dispatcher-闲置后2,3,4进入阻塞状态b之前。我们还观察到池在增长–新线程在 default-dispatcher-18,19,20,21 ...启动,但是它们立即进入睡眠状态(!)–我们在这里浪费了宝贵的资源!



此类启动线程的数量取决于默认的调度程序配置,但可能不会超过50个左右。由于我们只触发了2k个阻塞操作,因此我们使整个线程池都饿死了–阻塞操作占主导地位,从而路由基础设施没有可用的线程来处理其他请求–非常糟糕!



让我们为此做些什么(这是Akka的最佳实践-始终隔离如下所示的阻塞行为):



2) [好!] 调度程序行为良好的结构化代码/调度程序



在您的 application.conf 配置此专用于阻止行为的调度程序:

  my-blocking-dispatcher {
type =调度程序
执行者=线程池执行者
线程池执行者{
//在2.4.2之前的Akka中:
core-pool-size-min = 16
core-pool-size-max = 16
max-pool-min-min = 16
max-pool-max-max = 16
//或在Akka 2.4中.2+
固定池大小= 16
}
吞吐量= 100
}

您应该在



因此,最初,正常的请求可以由默认的调度程序轻松处理,您可以在此处看到几条绿线-这是实际的执行情况(我并没有真正使服务器承受沉重的负担,因此它大部分都是空闲的)。 / p>

现在,当我们开始发布阻止操作时, my-blocking-dispatcher-* 会启动到配置的线程数。它处理在那里的所有睡眠。同样,在这些线程上一段时间没有任何反应之后,它将关闭它们。如果我们要用另一堆阻塞来攻击服务器,则池将启动新线程,这些新线程将负责sleep()-处理它们,但与此同时–我们并没有在只呆在那里,没做什么。



使用此设置时,普通GET请求的吞吐量没有受到影响,它们仍然很高兴地在(仍然非常免费的)默认调度程序上得到服务。



这是处理反应性应用程序中任何类型的阻塞的推荐方法。通常将其称为笨拙(或隔离)应用程序的不良行为,在这种情况下,不良行为是睡眠/阻止。



3)[解决方法] 正确应用阻止时的调度程序行为



在此示例中,我们使用



您会注意到创建了很多个新线程,这是因为阻止提示哦,这将被阻止,所以我们需要更多线程。这导致我们被阻塞的总时间小于1)示例中的总时间,但是在阻塞操作完成之后,我们有数百个线程什么也不做...当然,它们最终将被关闭(FJP会这样做),但有一段时间,我们将运行大量(不受控制的)线程,与2)解决方案相反,在这里,我们确切知道有多少线程专用于阻塞行为。



总结:从不阻止默认调度程序:-)



最佳做法是使用模式显示在 2) 中,以便有一个可用于阻止操作的调度程序,并在那里执行它们。



希望这对您有帮助,快乐的哈克



讨论的Akka HTTP版本 2.0.1



使用的探查器:许多人私下回答了这个问题,问我什么探查器我曾经可视化线程上面图片中的状态,因此请在此处添加此信息:我使用了 YourKit 这是一个很棒的广告profiler(对于OSS是免费的),尽管您可以使用来自OpenJDK的免费 VisualVM 获得相同的结果。


I am trying to use Akka HTTP to basic authenticate my request. It so happens that I have an external resource to authenticate through, so I have to make a rest call to this resource.

This takes some time, and while it's processing, it seems the rest of my API is blocked, waiting for this call. I have reproduced this with a very simple example:

// used dispatcher:
implicit val system = ActorSystem()
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()


val routes = 
  (post & entity(as[String])) { e =>
    complete {
      Future{
        Thread.sleep(5000)
        e
      }
    }
  } ~
  (get & path(Segment)) { r =>
    complete {
      "get"
    }
  }

If I post to the log endpoint, my get endpoint is also stuck waiting for the 5 seconds, which the log endpoint dictated.

Is this expected behaviour, and if is, how do I make blocking operations without blocking my entire API?

解决方案

What you observe is expected behaviour – yet of course it's very bad. Good that known solutions and best practices exist to guard against it. In this answer I'd like to spend some time to explain the issue short, long, and then in depth – enjoy the read!

Short answer: "don't block the routing infrastructure!", always use a dedicated dispatcher for blocking operations!

Cause of the observed symptom: The problem is that you're using context.dispatcher as the dispatcher the blocking futures execute on. The same dispatcher (which is in simple terms just a "bunch of threads") is used by the routing infrastructure to actually handle the incoming requests – so if you block all available threads, you end up starving the routing infrastructure. (A thing up for debate and benchmarking is if Akka HTTP could protect from this, I'll add that to my research todo-list).

Blocking must be treated with special care to not impact other users of the same dispatcher (which is why we make it so simple to separate execution onto different ones), as explained in the Akka docs section: Blocking needs careful management.

Something else I wanted to bring to attention here is that one should avoid blocking APIs at all if possible - if your long running operation is not really one operation, but a series thereof, you could have separated those onto different actors, or sequenced futures. Anyway, just wanted to point out – if possible, avoid such blocking calls, yet if you have to – then the following explains how to properly deal with those.

In-depth analysis and solutions:

Now that we know what is wrong, conceptually, let's have a look what exactly is broken in the above code, and how the right solution to this problem looks like:

Colour = thread state:

  • turquoise – SLEEPING
  • orange - WAITING
  • green - RUNNABLE

Now let's investigate 3 pieces of code and how the impact the dispatchers, and performance of the app. To force this behaviour the app has been put under the following load:

  • [a] keep requesting GET requests (see above code in initial question for that), it's not blocking there
  • [b] then after a while fire 2000 POST requests, which will cause the 5second blocking before returning the future

1) [bad] Dispatcher behaviour on bad code:

// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses defaultDispatcher
      Thread.sleep(5000)                    // will block on the default dispatcher,
      System.currentTimeMillis().toString   // starving the routing infra
    }
  }
}

So we expose our app to [a] load, and you can see a number of akka.actor.default-dispatcher threads already - they're handling the requests – small green snippet, and orange meaning the others are actually idle there.

Then we start the [b] load, which causes blocking of these threads – you can see an early thread "default-dispatcher-2,3,4" going into blocking after being idle before. We also observe that the pool grows – new threads are started "default-dispatcher-18,19,20,21..." however they go into sleeping immediately (!) – we're wasting precious resource here!

The number of the such started threads depends on the default dispatcher configuration, but likely will not exceed 50 or so. Since we just fired 2k blocking ops, we starve the entire threadpool – the blocking operations dominate such that the routing infra has no thread available to handle the other requests – very bad!

Let's do something about it (which is an Akka best practice btw – always isolate blocking behaviour like shown below):

2) [good!] Dispatcher behaviour good structured code/dispatchers:

In your application.conf configure this dispatcher dedicated for blocking behaviour:

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    // in Akka previous to 2.4.2:
    core-pool-size-min = 16
    core-pool-size-max = 16
    max-pool-size-min = 16
    max-pool-size-max = 16
    // or in Akka 2.4.2+
    fixed-pool-size = 16
  }
  throughput = 100
}

You should read more in the Akka Dispatchers documentation, to understand the various options here. The main point though is that we picked a ThreadPoolExecutor which has a hard limit of threads it keeps available for the blocking ops. The size settings depend on what your app does, and how many cores your server has.

Next we need to use it, instead of the default one:

// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")

val routes: Route = post { 
  complete {
    Future { // uses the good "blocking dispatcher" that we configured, 
             // instead of the default dispatcher – the blocking is isolated.
      Thread.sleep(5000)
      System.currentTimeMillis().toString
    }
  }
}

We pressure the app using the same load, first a bit of normal requests and then we add the blocking ones. This is how the ThreadPools will behave in this case:

So initially the normal requests are easily handled by the default dispatcher, you can see a few green lines there - that's actual execution (I'm not really putting the server under heavy load, so it's mostly idle).

Now when we start issuing the blocking ops, the my-blocking-dispatcher-* kicks in, and starts up to the number of configured threads. It handles all the Sleeping in there. Also, after a certain period of nothing happening on those threads, it shuts them down. If we were to hit the server with another bunch of blocking the pool would start new threads that will take care of the sleep()-ing them, but in the meantime – we're not wasting our precious threads on "just stay there and do nothing".

When using this setup, the throughput of the normal GET requests was not impacted, they were still happily served on the (still pretty free) default dispatcher.

This is the recommended way of dealing with any kind of blocking in reactive applications. It often is referred to as "bulkheading" (or "isolating") the bad behaving parts of an app, in this case the bad behaviour is sleeping/blocking.

3) [workaround-ish] Dispatcher behaviour when blocking applied properly:

In this example we use the scaladoc for scala.concurrent.blocking method which can help when faced with blocking ops. It generally causes more threads to be spun up to survive the blocking operations.

// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher

val routes: Route = post { 
  complete {
    Future { // uses the default dispatcher (it's a Fork-Join Pool)
      blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat, 
                 // but at the cost of exploding the number of threads (which eventually
                 // may also lead to starvation problems, but on a different layer)
        Thread.sleep(5000)
        System.currentTimeMillis().toString
       }
    }
  }
}

The app will behave like this:

You'll notice that A LOT of new threads are created, this is because blocking hints at "oh, this'll be blocking, so we need more threads". This causes the total time we're blocked to be smaller than in the 1) example, however then we have hundreds of threads doing nothing after the blocking ops have finished... Sure, they will eventually be shut down (the FJP does this), but for a while we'll have a large (uncontrolled) amount of threads running, in contrast to the 2) solution, where we know exactly how many threads we're dedicating for the blocking behaviours.

Summing up: Never block the default dispatcher :-)

The best practice is to use the pattern shown in 2), to have a dispatcher for the blocking operations available, and execute them there.

Hope this helps, happy hakking!

Discussed Akka HTTP version: 2.0.1

Profiler used: Many people have asked me in response to this answer privately what profiler I used to visualise the Thread states in the above pics, so adding this info here: I used YourKit which is an awesome commercial profiler (free for OSS), though you can achieve the same results using the free VisualVM from OpenJDK.

这篇关于Akka HTTP:将来阻止会阻止服务器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆