为什么Akka Streams吞没我的例外? [英] Why is Akka Streams swallowing my exceptions?
问题描述
为什么
中的例外是 import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
对象TestExceptionHandling {
def main(args:Array [String]):Unit = {
implicit val actorSystem = ActorSystem()
隐式val物质= ActorMaterializer()(defaultActorSystem)
源(列表(1,2,3))。map {i =>
if(i == 2){
throw new RuntimeException(请不要吞下我!)
} else {
i
}
} .runForeach {i =>
println(sReceived $ i)
}
}
}
默认忽略?我可以看到,在打印收到1
之后,流被停止,但没有记录。请注意,问题不在于一般的日志配置,因为我在 akka.log-config-on-start = on
在我的 application.conf
file。
我正在使用自定义 Supervision.Decider
,确保正常记录了异常,可以这样设置:
val decider:Supervision.Decider = {e =>
logger.error(未处理的异常流,e)
Supervision.Stop
}
隐式val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
另外,正如维柯巴生所指出的,在上面给出的例子中,例外情况也可以通过
Source(List(1,2,3))抓住map {i =>
if(i == 2){
throw new RuntimeException(请不要吞下我!)
} else {
i
}
} .runForeach {i =>
println(sReceived $ i)
} .onComplete {
case Success(_)=>
println(Done)
case Failure(e)=>
println(sFailed with $ e)
}
,这种方法不会帮助你使用
Source(List(1,2,3))。map {i = >
$由于
if(i == 2){
throw new RuntimeException(请不要吞下我!)
} else {
i
}
} .to(Sink.foreach {i =>
println(sReceived $ i)
})。run()
run()
返回单位
。Why is the exception in
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source object TestExceptionHandling { def main(args: Array[String]): Unit = { implicit val actorSystem = ActorSystem() implicit val materializer = ActorMaterializer()(defaultActorSystem) Source(List(1, 2, 3)).map { i => if (i == 2) { throw new RuntimeException("Please, don't swallow me!") } else { i } }.runForeach { i => println(s"Received $i") } } }
silently ignored? I can see that the stream gets stopped after printing
Received 1
, but nothing is logged. Note that the problem is not the logging configuration in general, as I see a lot of output if I setakka.log-config-on-start = on
in myapplication.conf
file.解决方案I'm now using a custom
Supervision.Decider
that makes sure exceptions are properly logged, that can be set up like this:val decider: Supervision.Decider = { e => logger.error("Unhandled exception in stream", e) Supervision.Stop } implicit val actorSystem = ActorSystem() val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider) implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)
Also, as has been pointed out by Vikor Klang, in the example given above, the exception could also be "caught" via
Source(List(1, 2, 3)).map { i => if (i == 2) { throw new RuntimeException("Please, don't swallow me!") } else { i } }.runForeach { i => println(s"Received $i") }.onComplete { case Success(_) => println("Done") case Failure(e) => println(s"Failed with $e") }
Note however, that this approach won't help you with
Source(List(1, 2, 3)).map { i => if (i == 2) { throw new RuntimeException("Please, don't swallow me!") } else { i } }.to(Sink.foreach { i => println(s"Received $i") }).run()
since
run()
returnsUnit
.这篇关于为什么Akka Streams吞没我的例外?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!