等待所有的Future.onComplete回调执行完毕 [英] Wait until all Future.onComplete callbacks are executed
问题描述
我正在使用Scala 2.10.X中的Future
API.
I am using the Future
API from Scala 2.10.X.
这是我的用例:
object Class1 {
def apply(f: (Int) => Future[String])(i: Int): Future[String] = {
val start = DateTime.now
val result = f(i)
result.onComplete{
case _ => println("Started at " + start + ", ended at " + DateTime.now)
}
result
}
}
我认为很简单:我正在为我的未来添加一个onComplete回调.现在,我想知道是否有一种方法可以在onComplete完成执行时添加回调-在此示例中,知道何时完成日志记录.
Pretty simple I think: I'm adding an onComplete callback to my future. Now, I'm wondering if there is a way to add a callback for when the onComplete is done executing - in this example know when the logging is done.
假设我的result
实例已注册3个onComplete
,我可以知道它们何时都已执行吗?我认为这是不可能的,但谁知道:)
Let's say my result
instance has 3 onComplete
registered, can I know when all of them have been executed? I don't think it's possible but who knows :)
也许替代方法是调用map
而不是onComplete
来返回Future
的新实例:
Maybe an alternative would be to call map
instead of onComplete
to return a new instance of Future
:
def apply(f: (Int) => Future[String])(i: Int): Future[String] = {
val start = DateTime.now
f(i) map {
case r =>
println("Started at " + start + ", ended at " + DateTime.now)
r
}
}
但是我不确定它是否会保持相同的行为.
But I am not sure it would keep the same behavior.
为了澄清-仅一个实例是Future
,我在 same 实例上调用了onComplete
3次(嗯,在我的仅示例一次,但假设我叫了N次),并且我想知道由于同一Future
实例的完成,何时完成3个回调.
Just to clarify - there is only one instance of Future
, and I call onComplete
3 times on the same instance (Well, in my example only once, but let's say I'm calling it N times) and I want to know when the 3 callbacks are done executing due to the completion of the same Future
instance.
推荐答案
如果您不想使用其他方法(例如CountDownLatch),则希望使用andThen
来知道您的操作何时完成(成功或失败).否,以及未来是否成功).
If you don't want to use other methods (like a CountDownLatch), then you want to use andThen
to know when your operations complete (successfully or not, and whether or not the Future was successful).
scala> val f = Future(3)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@4b49ca35
scala> val g = f andThen { case Success(i) => println(i) } andThen { case _ => println("All done") }
3
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@1939e13
All done
如果将来失败,则不会调用映射的函数:
If the future fails, the mapped function by contrast isn't invoked:
scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@7001619b
scala> val g = f andThen { case t => println(s"stage 1 $t") } andThen { case _ => println("All done") }
stage 1 Failure(java.util.concurrent.ExecutionException: Boxed Error)
All done
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@24e1e7e8
scala> val g = f map { case i => println(i) } andThen { case _ => println("All done") }
All done
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5d0f75d6
scala> val g = f map { case i => println(i) } map { case _ => println("All done") }
g: scala.concurrent.Future[Unit] = scala.concurrent.impl.Promise$DefaultPromise@5aabe81f
scala> g.value
res1: Option[scala.util.Try[Unit]] = Some(Failure(java.util.concurrent.ExecutionException: Boxed Error))
类似地,在链式处理程序中崩溃不会破坏后续操作:
Similarly, blowing up in a chained handler doesn't break subsequent operations:
scala> val g = f andThen { case t => null.hashCode } andThen { case _ => Thread.sleep(1000L); println("All done") }
java.lang.NullPointerException
at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.applyOrElse(<console>:51)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:431)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:430)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
g: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@3fb7bec8
scala> All done
scala> g.value
res1: Option[scala.util.Try[Int]] = Some(Success(3))
对于需要等待的不幸情况:
For the unfortunate case of needing to wait for it:
scala> val f = Future[Int](???)
f: scala.concurrent.Future[Int] = scala.concurrent.impl.Promise$DefaultPromise@859a977
scala> import java.util.concurrent.{ CountDownLatch => CDL }
import java.util.concurrent.{CountDownLatch=>CDL}
scala> val latch = new CDL(3)
latch: java.util.concurrent.CountDownLatch = java.util.concurrent.CountDownLatch@11683e9f[Count = 3]
scala> f onComplete { _ => println(1); latch.countDown() }
1
scala> f onComplete { _ => println(2); latch.countDown() }
2
scala> f onComplete { _ => println(3); latch.countDown() }
3
scala> f onComplete { _ => latch.await(); println("All done") }
All done
这篇关于等待所有的Future.onComplete回调执行完毕的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!