如何在scala中顺序执行期货 [英] how to do sequential execution of Futures in scala
问题描述
我有一个需要使用迭代器的场景,为每个项目调用一个函数 f(item) 并返回一个 Future[Unit]
.
I have this scenario where I need to use an iterator, for each of the item a function f(item) is called and returns a Future[Unit]
.
但是,我需要让每个f(item)
调用顺序执行,它们不能并行运行.
However, I need to make it that each f(item)
call is executed sequentially, they can not run in parallel.
for(item <- it)
f(item)
不起作用,因为这会并行启动所有调用.
won't work becuase this starts all the calls in parallel.
我该怎么做才能让他们按顺序来?
How do I do it so they follow in sequence?
推荐答案
如果你不介意非常本地化的 var
,你可以串行化异步处理(每个 f(item)
) 如下(flatMap
进行序列化):
If you don't mind a very localised var
, you can serialise the asynchronous processing (each f(item)
) as follows (flatMap
does the serialization):
val fSerialized = {
var fAccum = Future{()}
for(item <- it) {
println(s"Processing ${item}")
fAccum = fAccum flatMap { _ => f(item) }
}
fAccum
}
fSerialized.onComplete{case resTry => println("All Done.")}
一般来说,避免 Await
操作 - 它们会阻塞(有点破坏异步点,消耗资源,对于草率的设计,可能会死锁)
In general, avoid Await
operations - they block (kind of defeats the point of async, consumes resources and for sloppy designs, can deadlock)
酷技巧 1:
您可以通过通常的可疑对象 flatmap
将 Futures
链接在一起 - 它序列化异步操作.有什么不能做的吗?;-)
You can chain together Futures
via that usual suspect, flatmap
- it serializes asynchronous operations. Is there anything it can't do? ;-)
def f1 = Future { // some background running logic here...}
def f2 = Future { // other background running logic here...}
val fSerialized: Future[Unit] = f1 flatMap(res1 => f2)
fSerialized.onComplete{case resTry => println("Both Done: Success=" + resTry.isSuccess)}
以上都没有阻塞 - 主线程在几十纳秒内直接运行.Futures 在所有情况下都用于执行并行线程并跟踪异步状态/结果以及链接逻辑.
None of the above blocks - the main thread runs straight through in a few dozen nanoseconds. Futures are used in all cases to execute parallel threads and keep track of asynchronous state/results and to chain logic.
fSerialized
表示链接在一起的两个不同异步操作的组合.一旦 val 被评估,它就会立即启动 f1
(异步运行).f1
像任何 Future
一样运行 - 当它最终完成时,它会调用它的 onComplete
回调块.这是很酷的一点 - flatMap
将它的参数安装为 f1
onComplete 回调块 - 所以 f2
在 f1后立即启动code> 完成,没有阻塞、轮询或浪费资源使用.当
f2
完成时,fSerialized
就完成了 - 所以它运行 fSerialized.onComplete
回调块 - 打印Both Done".
fSerialized
represents a composite of two different asynchronous operations chained together. As soon as the val is evaluated, it immediately starts f1
(running asynchonously). f1
runs like any Future
- when it eventually finishes, it calls it's onComplete
callback block. Here's the cool bit - flatMap
installs it's argument as the f1
onComplete callback block - so f2
is initiated as soon as f1
completes, with no blocking, polling or wasteful resource usage. When f2
is complete, then fSerialized
is complete - so it runs the fSerialized.onComplete
callback block - printing "Both Done".
不仅如此,您还可以使用整洁的非意大利面代码将平面图链接到任意数量
Not only that, but you can chain flatmaps as much as you like with neat non-spaghetti code
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) ...
如果您要通过 Future.onComplete 执行此操作,则必须将连续操作嵌入为嵌套的 onComplete 层:
If you were to do that via Future.onComplete, you would have to embed the successive operations as nested onComplete layers:
f1.onComplete{case res1Try =>
f2
f2.onComplete{case res2Try =>
f3
f3.onComplete{case res3Try =>
f4
f4.onComplete{ ...
}
}
}
}
没那么好.
测试证明:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val fSerial = f(4) flatMap(res1 => f(16)) flatMap(res2 => f(2)) flatMap(res3 => f(8))
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
酷招 2:
像这样的理解:
for {a <- aExpr; b <- bExpr; c <- cExpr; d <- dExpr} yield eExpr
不过是语法糖:
aExpr.flatMap{a => bExpr.flatMap{b => cExpr.flatMap{c => dExpr.map{d => eExpr} } } }
这是一个 flatMap 链,然后是最终的地图.
that's a chain of flatMaps, followed by a final map.
这意味着
f1 flatmap(res1 => f2) flatMap(res2 => f3) flatMap(res3 => f4) map(res4 => "Did It!")
等同于
for {res1 <- f1; res2 <- f2; res3 <- f3; res4 <- f4} yield "Did It!"
要证明的测试(继之前的测试之后):
Test to Prove (following on from previous test):
val fSerial = for {res1 <- f(4); res2 <- f(16); res3 <- f(2); res4 <- f(8)} yield "Did It!"
fSerial.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
不太酷的技巧 3:
不幸的是你不能混合迭代器和期货在同一个理解中.编译错误:
Unfortunately you can't mix iterators & futures in the same for-comprehension. Compile error:
val fSerial = {for {nextItem <- itemIterable; nextRes <- f(nextItem)} yield "Did It"}.last
嵌套 fors 带来了挑战.以下不序列化,但并行运行异步块(嵌套理解不会将后续 Future 与 flatMap/Map 链接起来,而是将其链接为 Iterable.flatMap{item => f(item)} - 不一样!)
And nesting fors creates a challenge. The following doesn't serialize, but runs async blocks in parallel (nested comprehensions don't chain subsequent Futures with flatMap/Map, but instead chains as Iterable.flatMap{item => f(item)} - not the same!)
val fSerial = {for {nextItem <- itemIterable} yield
for {nextRes <- f(nextItem)} yield "Did It"}.last
同样使用 foldLeft/foldRight 和 flatMap 也不能像你期望的那样工作 - 似乎是一个错误/限制;所有异步块都是并行处理的(所以 Iterator.foldLeft/Right
不能与 Future.flatMap
交流):
Also using foldLeft/foldRight plus flatMap doesn't work as you'd expect - seems a bug/limitation; all async blocks are processed in parallel (so Iterator.foldLeft/Right
is not sociable with Future.flatMap
):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
val empty = Future[Unit]{()}
def serialize(f1: Future[Unit], f2: Future[Unit]) = f1 flatMap(res1 => f2)
//val fSerialized = itemIterable.iterator.foldLeft(empty){(fAccum, item) => serialize(fAccum, f(item))}
val fSerialized = itemIterable.iterator.foldRight(empty){(item, fAccum) => serialize(fAccum, f(item))}
fSerialized.onComplete{case resTry => println("!!!! That's a wrap !!!! Success=" + resTry.isSuccess)}
但这有效(涉及var):
But this works (var involved):
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
import scala.concurrent.duration._
def f(item: Int): Future[Unit] = Future{
print("Waiting " + item + " seconds ...")
Console.flush
blocking{Thread.sleep((item seconds).toMillis)}
println("Done")
}
val itemIterable: Iterable[Int] = List[Int](4, 16, 2, 8)
var fSerial = Future{()}
for {nextItem <- itemIterable} fSerial = fSerial.flatMap(accumRes => f(nextItem))
这篇关于如何在scala中顺序执行期货的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!