在两个演员之间使用 pipeTo 时收到 akka 死信 [英] getting akka dead letters when using pipeTo between two actors

查看:45
本文介绍了在两个演员之间使用 pipeTo 时收到 akka 死信的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,其中我有一个演员层次结构

i have a use case in which i have a actor hierarchy

parent -> childABC -> workerchild

现在工作子工作并将其结果发送到其父级(childABC,它是父级的子级),并且子actor(childABC)将结果发送回父actor我正在使用 pipeTo 和在这里收到死信是我的代码

Now the worker child works and send its result to its parent(childABC which is a child of parent) and that child actor(childABC) send the result back to parent actor I am using pipeTo and getting dead letters here is my code

parent 演员:

final case object GetFinalValue

class MyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetFinalValue=>
      ask(myManageActor,GetValue).pipeTo(sender())

    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

childABC(根据我上面给出的例子)

childABC (acc to example I gave above)

final case object GetValue

class ManagerMyActor extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)
  val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
  implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

  override def receive: Receive = {
    case GetValue=>
      ask(myTokenActor,CalculateValue).pipeTo(sender())
 
    case message =>
      log.warn(" Unhandled message received : {}", message)
      unhandled(message)
  }

}

child 演员:

final case object CalculateValue

class TokenMyActor2 extends Actor{
  import context.dispatcher
  import akka.pattern.pipe
  val log = LoggerFactory.getLogger(this.getClass)

  override def receive: Receive = {
    case CalculateValue=>
      val future = Future{ "get the string"
      }
      val bac = future.map{result =>
          sender ! result
      }//.pipeTo(sender())


    case message =>
      log.warn("Actor MyActor: Unhandled message received : {}", message)
      unhandled(message)
  }

}


def main(args: Array[String]): Unit = {
    implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)

    val myActor = system.actorOf(Props[MyActor],"myActor")
    val future = ask(myActor, GetFinalValue).mapTo[String]
    future.map {str =>
      log.info ("string is {}",str)
    }

这里是日志:

[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

请指导我哪里弄错了,或者pipeTo不应该这样使用?如果是这样,我该怎么做才能让它发挥作用

Please guide me where am I mistaken, or pipeTo should not be used like this? if so what should i do to make it work

推荐答案

不确定是否有意,但 ask(myManageActor,GetValue).pipeTo(sender()) 可以实现为 <代码>转发.

Not sure if it's intended or not but ask(myManageActor,GetValue).pipeTo(sender()) can be implemented as forward.

class MyActor extends Actor {
  lazy val myManageActor: ActorRef = ???

  override def receive: Receive = {
    case GetFinalValue =>
      myManageActor.forward(GetValue)
  }
}

forwardtell 相同,但它保留消息的原始发件人.

forward is the same as tell but it preserves the original sender of the messages.

这可以应用于 MyActorManagerMyActor.

This can be applied to MyActor and ManagerMyActor.

TokenMyActor2的情况下,你不应该使用

In the case of TokenMyActor2, you should not use

future.map{ result =>
          sender ! result
      }

因为它破坏了 akka 上下文封装,如 文档

as it it breaks akka context encapsulation, as specified in docs

当使用未来的回调时,比如 onComplete,或者 map 之类的thenRun 或 thenApply 需要小心避免的内部actor关闭包含actor的引用,即不调用方法或从内部访问封闭参与者上的可变状态打回来.这会破坏 actor 封装并可能引入同步错误和竞争条件,因为回调将同时调度给封闭的演员.不幸的是有目前还没有一种在编译时检测这些非法​​访问的方法.看还有:Actors 和共享可变状态

When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state

您应该使用 Future(???).pipeTo(sender()),它可以安全地与 sender() 一起使用.

You should instead rely on Future(???).pipeTo(sender()), which is safe to use with sender().

应用这些更改后,代码确实按预期工作

After applying these changes, the code does work as expected

case object GetFinalValue
case object GetValue
case object CalculateValue

class MyActor extends Actor {
  private val myManageActor: ActorRef =
    context.actorOf(Props[ManagerMyActor], "myManageActor")

  override def receive: Receive = { case GetFinalValue =>
    myManageActor.forward(GetValue)
  }
}

class ManagerMyActor extends Actor {
  private val myTokenActor =
    context.actorOf(Props[TokenMyActor2], "toknMyActor2")

  override def receive: Receive = { case GetValue =>
    myTokenActor.forward(CalculateValue)
  }

}

class TokenMyActor2 extends Actor {
  import context.dispatcher

  override def receive: Receive = { case CalculateValue =>
    val future = Future { "get the string" }
    future.pipeTo(sender())
  }
}

implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
  println(s"got $str")
}

产生得到字符串.

最后一点,我建议不要在 actor 中使用 ask 模式.ask 的基本功能可以通过 tellforward 轻松实现.此外,代码更短,并且不会因 隐式 val 超时

As a final note, I'd advise not to use ask pattern within actors. The basic functionality of ask can be easily achieved with just tell and forward. Also the code is shorter and not overloaded with constant need of implicit val timeout

这篇关于在两个演员之间使用 pipeTo 时收到 akka 死信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆