如何等待几个期货? [英] How to wait for several Futures?
问题描述
假设我有几个期货,需要等到或中的任何一个失败或都成功。
Suppose I have several futures and need to wait until either any of them fails or all of them succeed.
例如:设3个期货: f1
, f2
, f3
。
For example: Let there are 3 futures: f1
, f2
, f3
.
-
如果
f1
成功并且f2
失败,我不等待f3
(并将 failure 返回给客户端)。
If
f1
succeeds andf2
fails I do not wait forf3
(and return failure to the client).
如果 f2
失败而 f1
和 f3
仍在运行我不等待它们(并返回失败)
If f2
fails while f1
and f3
are still running I do not wait for them (and return failure)
如果 f1
成功,然后 f2
成功,我将继续等待 f3
。
If f1
succeeds and then f2
succeeds I continue waiting for f3
.
您将如何实现?
推荐答案
您可以改为使用以下理解:
You could use a for-comprehension as follows instead:
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
在此示例中,期货1、2和3并行启动。然后,为了理解,我们等到结果1,然后2,然后3可用。如果1或2失败,我们将不再等待3。如果所有3个都成功,则 aggFut
val将持有一个带有3个插槽的元组,对应于3个期货的结果。
In this example, futures 1, 2 and 3 are kicked off in parallel. Then, in the for comprehension, we wait until the results 1 and then 2 and then 3 are available. If either 1 or 2 fails, we will not wait for 3 anymore. If all 3 succeed, then the aggFut
val will hold a tuple with 3 slots, corresponding to the results of the 3 futures.
现在,如果您需要行为,而如果首先说fut2失败,则想停止等待,事情会变得有些棘手。在上面的示例中,您将必须等待fut1完成才能实现fut2失败。要解决该问题,您可以尝试执行以下操作:
Now if you need the behavior where you want to stop waiting if say fut2 fails first, things get a little trickier. In the above example, you would have to wait for fut1 to complete before realizing fut2 failed. To solve that, you could try something like this:
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
现在这可以正常工作,但是问题出在知道哪个未来
可以在成功完成后从地图
中删除。只要您有某种方法可以将结果与产生该结果的Future正确关联,那么类似的方法就可以工作。它只是递归地不断从地图中删除完整的期货,然后在其余的期货
上调用 Future.firstCompletedOf
,直到没有左,沿途收集结果。它不是很漂亮,但是如果您确实需要您在谈论的行为,那么此操作或类似方法都可以使用。
Now this works correctly, but the issue comes from knowing which Future
to remove from the Map
when one has been successfully completed. As long as you have some way to properly correlate a result with the Future that spawned that result, then something like this works. It just recursively keeps removing completed Futures from the Map and then calling Future.firstCompletedOf
on the remaining Futures
until there are none left, collecting the results along the way. It's not pretty, but if you really need the behavior you are talking about, then this, or something similar could work.
这篇关于如何等待几个期货?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!