如何等待几个期货 [英] How to wait for several Futures

查看:129
本文介绍了如何等待几个期货的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有几个未来,需要等待 任何一个失败,所有这些都成功了。

Suppose I have several futures and need to wait until either any of them fails or all of them succeed.

例如:假设有3个期货: f1 f2 f3

For example: Let there are 3 futures: f1, f2, f3.


  • 如果 f1 成功且 f2 失败我不等待 f3 (并返回失败给客户端)。

  • If f1 succeeds and f2 fails I do not wait for f3 (and return failure to the client).

如果 f2 失败, f1 f3 仍在运行我不等待他们(并返回失败

If f2 fails while f1 and f3 are still running I do not wait for them (and return failure)

如果 f1 成功,然后 f2 成功,我继续等待 f3

If f1 succeeds and then f2 succeeds I continue waiting for f3.

如何实现?

推荐答案

您可以使用如下的for-comprehension:

You could use a for-comprehension as follows instead:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

在此示例中,期货1,2和3并行启动。然后,在for comprehension中,我们等待直到结果1,然后2,然后3可用。如果1或2失败,我们不会再等待3。如果所有3个成功,则 aggFut val将持有具有3个槽的元组,对应于3个期货的结果。

In this example, futures 1, 2 and 3 are kicked off in parallel. Then, in the for comprehension, we wait until the results 1 and then 2 and then 3 are available. If either 1 or 2 fails, we will not wait for 3 anymore. If all 3 succeed, then the aggFut val will hold a tuple with 3 slots, corresponding to the results of the 3 futures.

现在如果你需要的行为,你想停止等待,如果说fut2首先失败,事情变得有点棘手。在上面的例子中,你必须等待fut1完成,然后才能实现fut2失败。要解决这个问题,你可以尝试这样:

Now if you need the behavior where you want to stop waiting if say fut2 fails first, things get a little trickier. In the above example, you would have to wait for fut1 to complete before realizing fut2 failed. To solve that, you could try something like this:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

现在这个工作正常,但问题来自知道未来可在成功完成后从 Map 中删除​​。只要你有一些方法正确地关联一个结果与未来产生的结果,那么这样的东西工作。它只是递归地从映射中删除完成的Futures,然后在剩余的 Futures 上调用 Future.firstCompletedOf ,直到没有左,收集结果沿途。这不漂亮,但如果你真的需要你正在谈论的行为,那么这或类似的东西可以工作。

Now this works correctly, but the issue comes from knowing which Future to remove from the Map when one has been successfully completed. As long as you have some way to properly correlate a result with the Future that spawned that result, then something like this works. It just recursively keeps removing completed Futures from the Map and then calling Future.firstCompletedOf on the remaining Futures until there are none left, collecting the results along the way. It's not pretty, but if you really need the behavior you are talking about, then this, or something similar could work.

这篇关于如何等待几个期货的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆