在两个演员之间使用 pipeTo 时收到 akka 死信 [英] getting akka dead letters when using pipeTo between two actors
问题描述
我有一个用例,其中我有一个演员层次结构
i have a use case in which i have a actor hierarchy
parent -> childABC -> workerchild
现在工作子工作并将其结果发送到其父级(childABC,它是父级的子级),并且子actor(childABC)将结果发送回父actor我正在使用 pipeTo
和在这里收到死信是我的代码
Now the worker child works and send its result to its parent(childABC which is a child of parent) and that child actor(childABC) send the result back to parent actor I am using pipeTo
and getting dead letters here is my code
parent
演员:
final case object GetFinalValue
class MyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myManageActor = context.actorOf(Props[ManagerMyActor],"Managemyactor")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetFinalValue=>
ask(myManageActor,GetValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
childABC
(根据我上面给出的例子)
childABC
(acc to example I gave above)
final case object GetValue
class ManagerMyActor extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
val myTokenActor = context.actorOf(Props[TokenMyActor2],"toknMyActor2")
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
override def receive: Receive = {
case GetValue=>
ask(myTokenActor,CalculateValue).pipeTo(sender())
case message =>
log.warn(" Unhandled message received : {}", message)
unhandled(message)
}
}
child
演员:
final case object CalculateValue
class TokenMyActor2 extends Actor{
import context.dispatcher
import akka.pattern.pipe
val log = LoggerFactory.getLogger(this.getClass)
override def receive: Receive = {
case CalculateValue=>
val future = Future{ "get the string"
}
val bac = future.map{result =>
sender ! result
}//.pipeTo(sender())
case message =>
log.warn("Actor MyActor: Unhandled message received : {}", message)
unhandled(message)
}
}
def main(args: Array[String]): Unit = {
implicit val timeout = Timeout(ReadTimeIntervalValue.getInterval(), SECONDS)
val myActor = system.actorOf(Props[MyActor],"myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.map {str =>
log.info ("string is {}",str)
}
这里是日志:
[INFO] [akkaDeadLetter][01/12/2021 19:17:22.000] [api-akka.actor.default-dispatcher-5] [akka://api/deadLetters] Message [java.lang.String] from Actor[akka://api/user/myActor/Managemyactor/toknMyActor2#1239397461] to Actor[akka://api/deadLetters] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.989] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor#1829301550] to Actor[akka://api/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akkaDeadLetter][01/12/2021 19:17:41.996] [api-akka.actor.default-dispatcher-7] [akka://api/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://api/user/myActor/Managemyactor#-269929265] to Actor[akka://api/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://api/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
请指导我哪里弄错了,或者pipeTo
不应该这样使用?如果是这样,我该怎么做才能让它发挥作用
Please guide me where am I mistaken, or pipeTo
should not be used like this? if so what should i do to make it work
推荐答案
不确定是否有意,但 ask(myManageActor,GetValue).pipeTo(sender())
可以实现为 <代码>转发代码>.
Not sure if it's intended or not but ask(myManageActor,GetValue).pipeTo(sender())
can be implemented as forward
.
class MyActor extends Actor {
lazy val myManageActor: ActorRef = ???
override def receive: Receive = {
case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
forward
与 tell
相同,但它保留消息的原始发件人.
forward
is the same as tell
but it preserves the original sender of the messages.
这可以应用于 MyActor
和 ManagerMyActor
.
This can be applied to MyActor
and ManagerMyActor
.
在TokenMyActor2
的情况下,你不应该使用
In the case of TokenMyActor2
, you should not use
future.map{ result =>
sender ! result
}
因为它破坏了 akka 上下文封装,如 文档
as it it breaks akka context encapsulation, as specified in docs
当使用未来的回调时,比如 onComplete,或者 map 之类的thenRun 或 thenApply 需要小心避免的内部actor关闭包含actor的引用,即不调用方法或从内部访问封闭参与者上的可变状态打回来.这会破坏 actor 封装并可能引入同步错误和竞争条件,因为回调将同时调度给封闭的演员.不幸的是有目前还没有一种在编译时检测这些非法访问的方法.看还有:Actors 和共享可变状态
When using future callbacks, such as onComplete, or map such as thenRun, or thenApply inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This would break the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time. See also: Actors and shared mutable state
您应该使用 Future(???).pipeTo(sender())
,它可以安全地与 sender()
一起使用.
You should instead rely on Future(???).pipeTo(sender())
, which is safe to use with sender()
.
应用这些更改后,代码确实按预期工作
After applying these changes, the code does work as expected
case object GetFinalValue
case object GetValue
case object CalculateValue
class MyActor extends Actor {
private val myManageActor: ActorRef =
context.actorOf(Props[ManagerMyActor], "myManageActor")
override def receive: Receive = { case GetFinalValue =>
myManageActor.forward(GetValue)
}
}
class ManagerMyActor extends Actor {
private val myTokenActor =
context.actorOf(Props[TokenMyActor2], "toknMyActor2")
override def receive: Receive = { case GetValue =>
myTokenActor.forward(CalculateValue)
}
}
class TokenMyActor2 extends Actor {
import context.dispatcher
override def receive: Receive = { case CalculateValue =>
val future = Future { "get the string" }
future.pipeTo(sender())
}
}
implicit val timeout = Timeout(3, SECONDS)
implicit val system = ActorSystem("adasd")
import system.dispatcher
val myActor = system.actorOf(Props[MyActor], "myActor")
val future = ask(myActor, GetFinalValue).mapTo[String]
future.foreach { str =>
println(s"got $str")
}
产生得到字符串
.
最后一点,我建议不要在 actor 中使用 ask
模式.ask
的基本功能可以通过 tell
和 forward
轻松实现.此外,代码更短,并且不会因 隐式 val 超时
As a final note, I'd advise not to use ask
pattern within actors. The basic functionality of ask
can be easily achieved with just tell
and forward
. Also the code is shorter and not overloaded with constant need of implicit val timeout
这篇关于在两个演员之间使用 pipeTo 时收到 akka 死信的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!