关于Future.firstCompletedOf和垃圾收集机制 [英] About Future.firstCompletedOf and Garbage Collect mechanism

查看:162
本文介绍了关于Future.firstCompletedOf和垃圾收集机制的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的真实项目中遇到过这个问题,并通过我的测试代码和分析器证明了这一点。我不是粘贴tl; dr代码,而是向您展示一张图片然后描述它。
Future.firstCompletedOf c $ c>从2 Future s得到一个结果,两者都没有共享的东西,也不关心对方。即使这是我想要解决的问题,垃圾收集器无法回收第一个 Result 对象,直到 Future code>完成

所以我很好奇背后的机制。有人可以从较低的层面解释它,或提供一些暗示让我看看。



谢谢!



PS:是因为它们共享相同的 ExecutionContext



**更新**按要求粘贴测试代码

 对象Main扩展应用程序{
println(Test start)

val超时= 30000

特性结果{
val id:Int
val str =我很短
}
类BigObject(val id:Int )延伸结果{
覆盖val str =really big str
}

def guardian = Future({
Thread.sleep(timeout)
新结果{val id = 99999}
})

def worker(i:Int)= Future({
Thread.sleep(100)
new BigObject i)范围(1,1000)){
println(round+ i)
Thread.sleep(i)
})

20)
Future.firstCompletedOf(Seq(
监护人,
工人(i)
))。map(r => p (true){
Thread.sleep(2000)
}
} $ r
b $ b


解决方案

让我们看看 firstCompletedOf 被执行:
$ b $ pre $ def firstCompletedOf [T](期货:TraversableOnce [Future [T]])(implicit执行者:ExecutionContext):Future [T] = {
val p = Promise [T]()
val completeFirst:Try [T] => Unit = p tryComplete _
foreach {_ onComplete completeFirst}
p.future
}

在执行 {futures foreach {_ onComplete completeFirst} 时,函数 {_ onComplete completeFirst} 通过 ExecutionContext.execute 保存在某处
。确切地说,这个函数保存的是不相关的,我们只知道它必须保存在
的某个地方,以便稍后可以在线程池中执行时执行它。



该函数关闭 completeFirst ,该函数在 p 上关闭。
所以只要还有一个未来(从 futures )等待完成,就有一个对 p ,它可以防止它被垃圾回收(即使到了那个时候, firstCompletedOf 已经返回,移除 p

当第一个未来完成时,它将结果保存到承诺中(通过调用 p.tryComplete code>)。
由于承诺 p 包含结果,因此至少只要 p 为如果从 futures >未来至少一次未完成,那么我们就可以看到 p
这就是为什么在所有期货完成前无法收集结果的原因。


更新
现在问题是:它可以修复吗?我认为它可以。我们所要做的就是确保第一个将来以线程安全的方式完成对p的引用,这可以通过使用AtomicReference实例来完成。例如:

  def firstCompletedOf [T](期货:TraversableOnce [Future [T]])(隐式执行器:ExecutionContext) :Future [T] = {
val p = Promise [T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst:Try [T ] => Unit = {result:Try [T] =>
val promise = pref.getAndSet(null)
if(promise!= null){
promise.tryComplete(result)
}
}
期货foreach {_ onComplete completeFirst}
p.future
}

我测试过它和预期的一样,只要第一个未来完成,结果就会被垃圾回收。它应该在所有其他方面表现一致。


I've encountered this problem in my real-life project and proved by my testing code and profiler. Instead of pasting "tl;dr" code, I'm showing you a picture and then describe it.

Simply put, I'm using Future.firstCompletedOf to get a result from 2 Futures, both of which have no shared things and don't care about each other. Even though, which is the question I want to address, the Garbage Collector cannot recycle the first Result object until both of the Futures finished.

So I'm really curious about the mechanism behind this. Could someone explain it from a lower level, or provide some hint for me to look into.

Thanks!

PS: is it because they share the same ExecutionContext?

** Update ** paste test code as requested

object Main extends App{
  println("Test start")

  val timeout = 30000

  trait Result {
    val id: Int
    val str = "I'm short"
  }
  class BigObject(val id: Int) extends Result{
    override val str = "really big str"
  }

  def guardian = Future({
    Thread.sleep(timeout)
    new Result { val id = 99999 }
  })

  def worker(i: Int) = Future({
    Thread.sleep(100)
    new BigObject(i)
  })

  for (i <- Range(1, 1000)){
    println("round " + i)
    Thread.sleep(20)
    Future.firstCompletedOf(Seq(
      guardian,
      worker(i)
    )).map( r => println("result" + r.id))
  }

  while (true){
    Thread.sleep(2000)
  }
}

解决方案

Let's see how firstCompletedOf is implemented:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val completeFirst: Try[T] => Unit = p tryComplete _
  futures foreach { _ onComplete completeFirst }
  p.future
}

When doing { futures foreach { _ onComplete completeFirst }, the function { _ onComplete completeFirst } is saved somewhere via ExecutionContext.execute. Where exactly is this function saved is irrelevant, we just know that it has to be saved somewhere so that it can be picked later on and executed on a thread pool when a thread becomes available.

This function closes over completeFirst which closes over p. So as long as there is still one future (from futures) waiting to be completed, there is a reference to p that prevents it to be garbage collected (even though by that point chances are that firstCompletedOf has already returned, removing p from the stack).

When the first future completes, it saves the result into the promise (by calling p.tryComplete). Because the promise p holds the result, the result is reachable for at least as long as p is reachablle, and as we saw p is reachable as long as as at least once future from futures has not completed. This is the reason why the result cannot be collected before all the futures have completed.

UPDATE: Now the question is: could it be fixed? I think it could. All we would have to do is to ensure that the first future to complete "nulls out" the reference to p in a thread-safe way, which can be done by example using an AtomicReference. Something like this:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val pref = new java.util.concurrent.atomic.AtomicReference(p)
  val completeFirst: Try[T] => Unit = { result: Try[T] =>
    val promise = pref.getAndSet(null)
    if (promise != null) {
      promise.tryComplete(result)
    }
  }
  futures foreach { _ onComplete completeFirst }
  p.future
}

I have tested it and as expected it does allow the result to be garbage collected as soon as the first future completes. It should behave the same in all other respects.

这篇关于关于Future.firstCompletedOf和垃圾收集机制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆