Akka / Scala:映射Future与pipeTo [英] Akka/Scala: mapping Future vs pipeTo
问题描述
在 Akka
角色中,在发送 Future之间是否存在任何差异-在使用线程数或线程锁定方面 code>通过以下方式将结果提供给另一个演员:
In Akka
actors, are there any differences - in terms of number of threads being used, or thread locking - between sending a Future
result to another actor by:
A。将 Future
映射到功能,以将 tell
的结果转换为演员。
A. mapping the Future
to function that tell
the result to the actor.
B。定义将来的 onSuccess
回调,将 tell
的结果发送给参与者。
B. defining an onSuccess
callback on the future, which tell
the result to the actor.
C。通过 pipeTo
将 Future
结果传递给演员。
C. piping the Future
result to the actor with pipeTo
.
在上一个问题中讨论了其中一些选项:
Some of these options are discussed in a previous question:
Akka: Send a future message to an Actor
其中三个是首选这样做的方式,为什么?
此外,我想知道,如果收到
的类型应为 Any =>单位
,然后为什么编译代码,在某些情况下,的部分函数接收
返回未来
,而不是单位
?
Also, I would like to know, if receive
should be of type Any => Unit
, then why does the code compile when in some cases the partial function of receive
returns a Future
, not Unit
?
以下是我上面提到的三个选项的代码示例:
Here is a code example of the three options that I mentioned above:
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import akka.pattern.pipe
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Success
class ActorIncrement extends Actor {
def receive = {
case i: Int =>
println(s"increment $i")
sender ! i + 1
}
}
class ActorEven extends Actor {
def receive = {
case i: Int =>
println(s"$i is even")
}
}
class ActorOdd extends Actor {
def receive = {
case i: Int =>
println(s"$i is odd")
}
}
class MyActor(actorIncrement: ActorRef, actorEven: ActorRef, actorOdd: ActorRef) extends Actor {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val timeout = Timeout(5 seconds)
def receive = {
case i: Int if i % 2 == 0 =>
println(s"receive a: $i")
actorIncrement ? i map {
case j: Int =>
println(s"$j from increment a")
actorOdd ! j
}
case i: Int =>
println(s"receive b: $i")
val future: Future[Any] = actorIncrement ? i
future onSuccess {
case i: Int =>
println(s"$i from increment b")
actorEven ! i
}
case s: String =>
println(s"receive c: $s")
(actorIncrement ? s.toInt).mapTo[Int] filter(_ % 2 == 0) andThen { case Success(i: Int) => println(s"$i from increment c") } pipeTo actorEven
}
}
object TalkToActor extends App {
// Create the 'talk-to-actor' actor system
val system = ActorSystem("talk-to-actor")
val actorIncrement = system.actorOf(Props[ActorIncrement], "actorIncrement")
val actorEven = system.actorOf(Props[ActorEven], "actorEven")
val actorOdd = system.actorOf(Props[ActorOdd], "actorOdd")
val myActor = system.actorOf(Props(new MyActor(actorIncrement, actorEven, actorOdd)), "myActor")
myActor ! 2
myActor ! 7
myActor ! "11"
Thread.sleep(1000)
//shutdown system
system.terminate()
}
推荐答案
如果您看一下 pipeTo
在 akka.pattern.PipeToSupport
,
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef =
Actor.noSender): Future[T] = {
future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
}
}
如您所见... pipeTo
与将 andThen
调用添加到没什么不同未来
,它会发送未来结果或 Status.Failure
消息给管道演员,以防您的 Future
失败。
As you can see... pipeTo
is nothing different than just adding andThen
call to your Future
which either sends the future-result or a Status.Failure
message to the piped actor in case your Future
fails.
现在主要区别在于此 Status.Failure
失败处理。如果您不使用 pipeTo
,则可以以任何想要的方式处理失败。
Now the main difference lies in this Status.Failure
failure-handling. If you are not using pipeTo
, you can handle your failure in whatever way you want to.
这篇关于Akka / Scala:映射Future与pipeTo的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!