使用故障溢出策略时,Akka流Source.queue挂起 [英] Akka stream Source.queue hangs when using fail overflow strategy

查看:78
本文介绍了使用故障溢出策略时,Akka流Source.queue挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下Scala代码段似乎未返回:

The following Scala snippet doesn't seem to return:

val queue = 
  Source.queue[Unit](10, OverflowStrategy.fail)
    .throttle(1, 1 second, 1, ThrottleMode.shaping)
    .to(Sink.ignore)
    .run()

Await.result(
  (1 to 15).map(_ => queue.offer(())).last,
  Duration.Inf)

这是Akka流中的错误还是我做错了?

Is this a bug in Akka streams or am I doing something wrong?

编辑:只是回过头来,此错误已在Akka中打开并接受: https://github.com/akka/akka/issues/23078

just to circle back, this bug was opened and accepted in Akka: https://github.com/akka/akka/issues/23078

推荐答案

该程序提供了更多见解

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy, ThrottleMode}

import scala.concurrent.Await
import scala.concurrent.duration._

object Test extends App {
  implicit val actorSystem = ActorSystem()
  implicit val materializer = ActorMaterializer()
  import actorSystem.dispatcher

  val (queue, finalFuture) =
    Source.queue[Unit](10, OverflowStrategy.fail)
      .map(_ => println("Before throttle"))
      .throttle(1, 1.second, 1, ThrottleMode.shaping)
      .map(_ => println("After throttle"))
      .toMat(Sink.ignore)(Keep.both)
      .run()

  finalFuture.onComplete(r => println(s"Materialized future from ignore completed: $r"))

  Await.result((1 to 25).map(_ => queue.offer(()).map(e => println(s"Offer result: $e"))).last, Duration.Inf)
}

它为我打印了以下内容:

It prints the following for me:

Offer result: Enqueued
After throttle
Before throttle
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)
Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)

有时会以例外结尾:

Before throttle
After throttle
Before throttle
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Enqueued
Offer result: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)
Materialized future from ignore completed: Failure(akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 10)!)
Exception in thread "main" java.lang.IllegalStateException: Stream is terminated. SourceQueue is detached
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:57)
    at akka.stream.impl.QueueSource$$anon$1$$anonfun$postStop$1.apply(Sources.scala:56)
    at akka.stream.stage.CallbackWrapper$$anonfun$invoke$1.apply$mcV$sp(GraphStage.scala:1373)
    at akka.stream.stage.CallbackWrapper$class.akka$stream$stage$CallbackWrapper$$locked(GraphStage.scala:1379)
    at akka.stream.stage.CallbackWrapper$class.invoke(GraphStage.scala:1369)
    at akka.stream.impl.QueueSource$$anon$1.invoke(Sources.scala:47)
    at akka.stream.impl.QueueSource$$anon$2.offer(Sources.scala:180)
    at test.Test$$anonfun$4.apply(Test.scala:25)
    at test.Test$$anonfun$4.apply(Test.scala:25)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.Range.foreach(Range.scala:160)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at test.Test$.delayedEndpoint$test$Test$1(Test.scala:25)
    at test.Test$delayedInit$body.apply(Test.scala:10)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)

也就是说,您正在看到并发性-您提交的期货是并行执行的,其中之一他们以失败告终,但更多的是ng。如果您以失败的未来为先的顺序获得它们,则会得到异常,否则将无限期等待。

That is, you're seeing concurrency in action - the futures which you submit are executed in parallel, and one of them finishes with a failure, but much more often they just hang. If you get them in such an order that the failed future comes first, you get an exception, otherwise you get an infinite await.

确定流实际上已终止,您必须像上面一样直接查看它。但最重要的是,您最好不要将配置的事件数最多推送到队列中,或者如果您确实想这样做并且使用 OverflowStrategy.backpressure ,则始终需要等待提交的最后一个将来完成,然后再执行下一个 offer()

To determine that your stream has actually terminated, you have to look into it directly, like it is done above. But most importantly, you should better push no more than the configured number of events into the queue, or if you do want to do that and you use OverflowStrategy.backpressure, you always need to wait for the last future that you submit to complete before executing the next offer().

这篇关于使用故障溢出策略时,Akka流Source.queue挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆