Akka:等待多条消息 [英] Akka: waiting for multiple messages

查看:181
本文介绍了Akka:等待多条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好akka大师:)您能指导我吗?



我想做的-演员A向演员B询问信息,然后等着回来。但是,演员B以某种方式不给A一条消息,而是给其中4条消息。 Actor A Future 已正确完成,但是其余3条消息被视为无效字母。为什么?这是正确的吗?我的意思是,演员A有一个适当的处理程序,那为什么字母没了呢? :-(


[INFO] [11/22/2013 22:00:38.975] [ForkJoinPool-2-worker-7]
[akka:// actors / user / a]得到了结果pong [INFO] [2013年11月22日
22:00:38.976] [actors-akka.actor.default-dispatcher-4]
[akka:// actors / deadLetters]消息[java.lang.String]从
Actor [akka:// actors / user / b#-759739990]发送至
Actor [akka:// actors / deadLetters]未传送[1]遇到死字母
。可以使用
配置设置'akka.log-dead-letters'和
'akka来关闭或调整此日志记录.log-dead-letters-during-shutdown'。
...同一条消息多2次...


请看一下代码。

  package head_thrash 

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.ExecutionContext .Implicits.global

对象Mai n扩展App {
val system = ActorSystem( actors)

val a = system.actorOf(Props [A], a)
val b = system。 actorOf(Props [B], b)

a! ping
system.awaitTermination()
}

A类使用ActorLogging扩展Actor {

隐式val超时=超时(5.seconds)

def接收= {
情况为 ping => {
val b = context.actorSelection( ../ b)
val future:Future [String] = ask(b, ping)。mapTo [String]
future。 onSuccess {
case result:String⇒{
log.info( Got result + result)//<-在这里得到结果pong,没关系
}
}
}
case pong => {
log.info( hmmm ...)
}
}
}

B类通过ActorLogging扩展了Actor {
def receive = {{
case ping => {
个寄件者! pong
发件人! pong //<-死信!
个发件人! pong //<-死信!
个发件人! pong //<-死信!
}
}
}

这真的让我感到困惑。现在您可以问-嘿,您为什么需要B发送许多消息?好吧,那是更复杂的情况的一部分-A向B询问消息。 B答案。然后A 等待再收到B的另一条消息。这里最棘手的部分是普通在Future完成后等待-我只是不介意使该模型适合



但是现在,如何才能正确处理所有4条消息,而不会出现死信呢?谢谢:-D

解决方案

您的问题是演员B没有回答演员A。如果我们阅读文档我们发现询问创建一个临时的一次性演员以接收对消息的回复,并使用它完成 scala.concurrent.Future 。 / p>

此临时演员根本不处理 pong 消息,他只是在等待您的答复。然后将其转换为字符串的未来。



如果您要解决此问题,则必须修改演员B,以便它首先响应临时的询问演员,然后然后将消息直接发送到演员A。

  B类使用ActorLogging扩展了Actor {
def receive = {
情况 ping => {
个寄件者! pong //发件人是临时询问演员
val a = context.actorSelection( ../ a)//获取演员A
a的引用! bong
a! bong
a! pong
}
}
}

真的很干净,但是现在我希望您了解发生了什么。


Hi akka gurus:) Can you guide me in this one?

What I'm trying to do - Actor A asks Actor B for a message and wait's for one to arrive back. But, somehow Actor B gives back to A not one message, but 4 of them. Actor A Future completes properly, but 3 of rest messages are counted as dead letters. Why? Is this right? I mean, Actor A has a proper handler, why the letters are dead then? :-(

[INFO] [11/22/2013 22:00:38.975] [ForkJoinPool-2-worker-7] [akka://actors/user/a] Got result pong [INFO] [11/22/2013 22:00:38.976] [actors-akka.actor.default-dispatcher-4] [akka://actors/deadLetters] Message [java.lang.String] from Actor[akka://actors/user/b#-759739990] to Actor[akka://actors/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. ...Same message 2 more times...

Please take a look at the code.

package head_thrash

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
  val system = ActorSystem("actors")

  val a = system.actorOf(Props[A], "a")
  val b = system.actorOf(Props[B], "b")

  a ! "ping"
  system.awaitTermination()
}

class A extends Actor with ActorLogging {

  implicit val timeout = Timeout(5.seconds)

  def receive = {
    case "ping" => {
      val b = context.actorSelection("../b")
      val future: Future[String] = ask(b, "ping").mapTo[String]
      future.onSuccess {
        case result: String ⇒ {
          log.info("Got result " + result) // <-- got result pong here, that's okay
        }
      }
    }
    case "pong" => {
      log.info("hmmm...")
    }
  }
}

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
    }
  }
}

That really confuses me. Now you can ask - hey man, why do you need B to send many messages? Well, that is the part of more complicated case - A asks B for message. B answers. Then A waits for another message from B. The tricky part here is plain waiting after the Future completes - and I just can't make my mind to get that model fit into Akka basis.

But fow now, how can I get all 4 messages handled correctly, without dead letters? Thanks :-D

解决方案

Your problem is that actor B is not answering to actor A. If we read the documentation of the ask pattern we find that ask create a temporary one-off actor for receiving a reply to a message and complete a scala.concurrent.Future with it.

This temporary actor does not handle "pong" messages at all, he is just waiting for any answer that you are then casting as a Future of String.

If you want to fix this, you have to modify your actor B so that first it answers to the temporary "ask actor" and then sends messages directly to actor A.

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"  //the sender is the temp ask actor
      val a = context.actorSelection("../a") // get a ref on actor A
      a ! "pong"
      a ! "pong"
      a ! "pong"
    }
  }
}

This is not really clean, but now I hope you understand what is going on.

这篇关于Akka:等待多条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆