如何等待多个Future? [英] How to wait for several Futures?

查看:77
本文介绍了如何等待多个Future?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有几个未来,需要等到要么它们中的任何一个失败它们都成功.

Suppose I have several futures and need to wait until either any of them fails or all of them succeed.

例如:假设有 3 个期货:f1f2f3.

For example: Let there are 3 futures: f1, f2, f3.

  • 如果 f1 成功而 f2 失败,我不会等待 f3(并返回 failure给客户).

  • If f1 succeeds and f2 fails I do not wait for f3 (and return failure to the client).

如果 f2 失败而 f1f3 仍在运行,我不会等待它们(并返回 failure)

If f2 fails while f1 and f3 are still running I do not wait for them (and return failure)

如果 f1 成功,然后 f2 成功,我继续等待 f3.

If f1 succeeds and then f2 succeeds I continue waiting for f3.

您将如何实施它?

推荐答案

你可以使用 for-comprehension 来代替:

You could use a for-comprehension as follows instead:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

在本例中,期货 1、2 和 3 并行启动.然后,在for comprehension中,我们等到结果1,然后2,然后3可用.如果 1 或 2 失败,我们将不再等待 3.如果所有 3 个都成功,则 aggFut val 将保存一个具有 3 个槽的元组,对应于 3 个期货的结果.

In this example, futures 1, 2 and 3 are kicked off in parallel. Then, in the for comprehension, we wait until the results 1 and then 2 and then 3 are available. If either 1 or 2 fails, we will not wait for 3 anymore. If all 3 succeed, then the aggFut val will hold a tuple with 3 slots, corresponding to the results of the 3 futures.

现在,如果您需要在 say fut2 首先失败时停止等待的行为,事情会变得有点棘手.在上面的示例中,您必须等待 fut1 完成才能意识到 fut2 失败.要解决此问题,您可以尝试以下操作:

Now if you need the behavior where you want to stop waiting if say fut2 fails first, things get a little trickier. In the above example, you would have to wait for fut1 to complete before realizing fut2 failed. To solve that, you could try something like this:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

现在这可以正常工作,但问题来自于知道在成功完成后从 Map 中删除哪个 Future.只要您有某种方法可以将结果与产生该结果的 Future 正确关联,那么这样的事情就可以工作.它只是递归地不断从 Map 中删除已完成的 Futures,然后在剩余的 Futures 上调用 Future.firstCompletedOf 直到没有剩下的,沿途收集结果.这并不漂亮,但如果您真的需要您所谈论的行为,那么这或类似的东西可能会奏效.

Now this works correctly, but the issue comes from knowing which Future to remove from the Map when one has been successfully completed. As long as you have some way to properly correlate a result with the Future that spawned that result, then something like this works. It just recursively keeps removing completed Futures from the Map and then calling Future.firstCompletedOf on the remaining Futures until there are none left, collecting the results along the way. It's not pretty, but if you really need the behavior you are talking about, then this, or something similar could work.

这篇关于如何等待多个Future?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆