Akka / Scala:映射Future与pipeTo [英] Akka/Scala: mapping Future vs pipeTo

查看:277
本文介绍了Akka / Scala:映射Future与pipeTo的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Akka 角色中,在发送 Future之间是否存在任何差异-在使用线程数或线程锁定方面 code>通过以下方式将结果提供给另一个演员:

In Akka actors, are there any differences - in terms of number of threads being used, or thread locking - between sending a Future result to another actor by:

A。将 Future 映射到功能,以将 tell 的结果转换为演员。

A. mapping the Future to function that tell the result to the actor.

B。定义将来的 onSuccess 回调,将 tell 的结果发送给参与者。

B. defining an onSuccess callback on the future, which tell the result to the actor.

C。通过 pipeTo Future 结果传递给演员。

C. piping the Future result to the actor with pipeTo.

在上一个问题中讨论了其中一些选项:

Some of these options are discussed in a previous question:

Akka: Send a future message to an Actor

其中三个是首选这样做的方式,为什么?

此外,我想知道,如果收到的类型应为 Any =>单位,然后为什么编译代码,在某些情况下,的部分函数接收返回未来,而不是单位

Also, I would like to know, if receive should be of type Any => Unit, then why does the code compile when in some cases the partial function of receive returns a Future, not Unit?

以下是我上面提到的三个选项的代码示例:

Here is a code example of the three options that I mentioned above:

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe

import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success

class ActorIncrement extends Actor {

  def receive = {
    case i: Int =>
      println(s"increment $i")
      sender ! i + 1
  }
}

class ActorEven extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is even")
  }
}


class ActorOdd extends Actor {

  def receive = {
    case i: Int =>
      println(s"$i is odd")
  }
}

class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
  import scala.concurrent.ExecutionContext.Implicits.global

  implicit val timeout = Timeout(5 seconds)

  def receive = {
    case i: Int if i % 2 == 0 =>
      println(s"receive a: $i")
      actorIncrement ? i map {
        case j: Int =>
          println(s"$j from increment a")
          actorOdd ! j
      }
    case i: Int =>
      println(s"receive b: $i")
      val future: Future[Any] = actorIncrement ? i
      future onSuccess {
        case i: Int =>
          println(s"$i from increment b")
          actorEven ! i
      }

    case s: String =>
      println(s"receive c: $s")
      (actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
  }
}

object TalkToActor extends App {

  // Create the 'talk-to-actor' actor system
  val system = ActorSystem("talk-to-actor")

  val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
  val actorEven = system.actorOf(Props[ActorEven], "actorEven")
  val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")

  val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")

  myActor ! 2
  myActor ! 7
  myActor ! "11"

  Thread.sleep(1000)

  //shutdown system
  system.terminate()
}


推荐答案

如果您看一下 pipeTo akka.pattern.PipeToSupport

def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = 
  Actor.noSender): Future[T] = {
    future andThen {
      case Success(r) ⇒ recipient ! r
      case Failure(f) ⇒ recipient ! Status.Failure(f)
    }
  }
}

如您所见... pipeTo 与将 andThen 调用添加到没什么不同未来,它会发送未来结果或 Status.Failure 消息给管道演员,以防您的 Future 失败。

As you can see... pipeTo is nothing different than just adding andThen call to your Future which either sends the future-result or a Status.Failure message to the piped actor in case your Future fails.

现在主要区别在于此 Status.Failure 失败处理。如果您不使用 pipeTo ,则可以以任何想要的方式处理失败。

Now the main difference lies in this Status.Failure failure-handling. If you are not using pipeTo, you can handle your failure in whatever way you want to.

这篇关于Akka / Scala:映射Future与pipeTo的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆