为什么Akka Streams吞没我的例外? [英] Why is Akka Streams swallowing my exceptions?

查看:135
本文介绍了为什么Akka Streams吞没我的例外?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为什么

中的例外是

  import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

对象TestExceptionHandling {
def main(args:Array [String]):Unit = {
implicit val actorSystem = ActorSystem()
隐式val物质= ActorMaterializer()(defaultActorSystem)

源(列表(1,2,3))。map {i =>
if(i == 2){
throw new RuntimeException(请不要吞下我!)
} else {
i
}
} .runForeach {i =>
println(sReceived $ i)
}
}
}

默认忽略?我可以看到,在打印收到1 之后,流被停止,但没有记录。请注意,问题不在于一般的日志配置,因为我在 akka.log-config-on-start = on 在我的 application.conf file。

解决方案

我正在使用自定义 Supervision.Decider ,确保正常记录了异常,可以这样设置:

  val decider:Supervision.Decider = {e => 
logger.error(未处理的异常流,e)
Supervision.Stop
}

隐式val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

另外,正如维柯巴生所指出的,在上面给出的例子中,例外情况也可以通过

  Source(List(1,2,3))抓住map {i => 
if(i == 2){
throw new RuntimeException(请不要吞下我!)
} else {
i
}
} .runForeach {i =>
println(sReceived $ i)
} .onComplete {
case Success(_)=>
println(Done)
case Failure(e)=>
println(sFailed with $ e)
}

,这种方法不会帮助你使用

  Source(List(1,2,3))。map {i = > 
if(i == 2){
throw new RuntimeException(请不要吞下我!)
} else {
i
}
} .to(Sink.foreach {i =>
println(sReceived $ i)
})。run()
run()返回单位

Why is the exception in

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}

silently ignored? I can see that the stream gets stopped after printing Received 1, but nothing is logged. Note that the problem is not the logging configuration in general, as I see a lot of output if I set akka.log-config-on-start = on in my application.conf file.

解决方案

I'm now using a custom Supervision.Decider that makes sure exceptions are properly logged, that can be set up like this:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

Also, as has been pointed out by Vikor Klang, in the example given above, the exception could also be "caught" via

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}

Note however, that this approach won't help you with

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()

since run() returns Unit.

这篇关于为什么Akka Streams吞没我的例外?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆