futures >未来至少一次未完成,那么我们就可以看到 p
。
这就是为什么在所有期货完成前无法收集结果的原因。
更新:
现在问题是:它可以修复吗?我认为它可以。我们所要做的就是确保第一个将来以线程安全的方式完成对p的引用,这可以通过使用AtomicReference实例来完成。例如:
def firstCompletedOf [T](期货:TraversableOnce [Future [T]])(隐式执行器:ExecutionContext) :Future [T] = {
val p = Promise [T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst:Try [T ] => Unit = {result:Try [T] =>
val promise = pref.getAndSet(null)
if(promise!= null){
promise.tryComplete(result)
}
}
期货foreach {_ onComplete completeFirst}
p.future
}
我测试过它和预期的一样,只要第一个未来完成,结果就会被垃圾回收。它应该在所有其他方面表现一致。
I've encountered this problem in my real-life project and proved by my testing code and profiler. Instead of pasting "tl;dr" code, I'm showing you a picture and then describe it.
Simply put, I'm using Future.firstCompletedOf
to get a result from 2 Future
s, both of which have no shared things and don't care about each other. Even though, which is the question I want to address, the Garbage Collector cannot recycle the first Result
object until both of the Future
s finished.
So I'm really curious about the mechanism behind this. Could someone explain it from a lower level, or provide some hint for me to look into.
Thanks!
PS: is it because they share the same ExecutionContext
?
** Update ** paste test code as requested
object Main extends App{
println("Test start")
val timeout = 30000
trait Result {
val id: Int
val str = "I'm short"
}
class BigObject(val id: Int) extends Result{
override val str = "really big str"
}
def guardian = Future({
Thread.sleep(timeout)
new Result { val id = 99999 }
})
def worker(i: Int) = Future({
Thread.sleep(100)
new BigObject(i)
})
for (i <- Range(1, 1000)){
println("round " + i)
Thread.sleep(20)
Future.firstCompletedOf(Seq(
guardian,
worker(i)
)).map( r => println("result" + r.id))
}
while (true){
Thread.sleep(2000)
}
}
解决方案
Let's see how firstCompletedOf
is implemented:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val completeFirst: Try[T] => Unit = p tryComplete _
futures foreach { _ onComplete completeFirst }
p.future
}
When doing { futures foreach { _ onComplete completeFirst }
, the function { _ onComplete completeFirst }
is saved somewhere
via ExecutionContext.execute
. Where exactly is this function saved is irrelevant, we just know that it has to be saved somewhere
so that it can be picked later on and executed on a thread pool when a thread becomes available.
This function closes over completeFirst
which closes over p
.
So as long as there is still one future (from futures
) waiting to be completed, there is a reference to p
that prevents it to be garbage collected (even though by that point chances are that firstCompletedOf
has already returned, removing p
from the stack).
When the first future completes, it saves the result into the promise (by calling p.tryComplete
).
Because the promise p
holds the result, the result is reachable for at least as long as p
is reachablle, and as we saw p
is reachable as long as as at least once future from futures
has not completed.
This is the reason why the result cannot be collected before all the futures have completed.
UPDATE:
Now the question is: could it be fixed? I think it could. All we would have to do is to ensure that the first future to complete "nulls out" the reference to p in a thread-safe way, which can be done by example using an AtomicReference. Something like this:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst: Try[T] => Unit = { result: Try[T] =>
val promise = pref.getAndSet(null)
if (promise != null) {
promise.tryComplete(result)
}
}
futures foreach { _ onComplete completeFirst }
p.future
}
I have tested it and as expected it does allow the result to be garbage collected as soon as the first future completes. It should behave the same in all other respects.
这篇关于关于Future.firstCompletedOf和垃圾收集机制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!