应对期货 [英] React for futures

查看:87
本文介绍了应对期货的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用分治法(又名分叉/联接)解决数字运算问题。下面是代码:

  import scala.actors.Futures.future 

private def compute(输入:Input):Result = {
if(pair.size< SIZE_LIMIT){
computeSequential()
} else {
val(input1,input2)= input.split
val f1 =未来(compute(input1))
val f2 =未来(compute(input2))
val result1 = f1()
val result2 = f2()
merge(result1,result2)
}
}

运行速度不错),但将来的apply方法似乎会阻塞线程,并且线程池会大大增加。并且当创建了太多线程时,计算就停滞了。



期货是否有一种 react 方法来释放线程?或实现此行为的任何其他方法?



编辑:我正在使用scala 2.8.0.final

解决方案

不要声明(应用)您的 Future s,因为这会迫使他们阻止并等待答案;如您所见,这可能导致死锁。相反,请单调使用它们来告诉他们完成后该怎么做。代替:

  val result1 = f1()
val result2 = f2()
merge(result1 ,result2)

尝试一下:



<$ p {
result1<-f1
result2<-f2
}的$ p> 收益合并(result1,result2)

其结果将是 Responder [Result] (基本上是 Future [Result] )包含合并的结果;您可以使用 respond() foreach()对最终值进行有效的处理,或者可以 map() flatMap()到另一个 Responder [T]



编辑1:



好吧,签名是 compute 函数现在必须更改为 Responder [Result] ,所以这如何影响递归调用?让我们尝试一下:

 私人定义计算(输入:输入):响应者[结果] = {
if(对.size< SIZE_LIMIT){
future(computeSequential())
}否则{
val(input1,input2)= input.split
for {
result1< ;--compute(input1)
result2<-compute(input2)
}收益合并(result1,result2)
}
}

现在,您不再需要使用将调用包装到 compute future(...),因为他们已经返回了 Responder Future )。



编辑2:



使用这种连续传递样式的一个结果是,您的顶部-级别代码-最初调用 compute 的代码都不再受阻。如果从 main()调用它,而这正是程序所要做的,那么这将是一个问题,因为现在它将仅生成一堆期货,然后立即关闭,已经完成了被告知要做的一切。您需要做的是在所有这些期货上执行 block ,但在顶层仅执行一次,并且仅对 all 的结果进行计算,没有任何中间的。



不幸的是,此 Responder compute()不再像 Future 那样具有阻塞的 apply()方法。我不确定为什么flatMapping Future s会生成通用的 Responder 而不是 Future ;这似乎是一个API错误。但是无论如何,您都应该可以自己制作:

  def声明[A](r:Responder [A] ):A = {
import java.util.concurrent.ArrayBlockingQueue
import scala.actors.Actor.actor

val q = new ArrayBlockingQueue [A](1)
//使用响应需要包装在一个actor或将来的块中
actor {r.respond(a => q.put(a))}
return q.take
}

所以现在您可以创建阻塞调用以在您的 main 这样的方法:

  val finalResult = Claim(compute(input))


I am trying to use a divide-and-conquer (aka fork/join) approach for a number crunching problem. Here is the code:

import scala.actors.Futures.future

private def compute( input: Input ):Result = {
  if( pairs.size < SIZE_LIMIT ) {
    computeSequential()
  } else {
    val (input1,input2) = input.split
    val f1 = future( compute(input1) )
    val f2 = future( compute(input2) )
    val result1 = f1()
    val result2 = f2()
    merge(result1,result2)
  }
}

It runs (with a nice speed-up) but the the future apply method seems to block a thread and the thread pool increases tremendously. And when too many threads are created, the computations is stucked.

Is there a kind of react method for futures which releases the thread ? Or any other way to achieve that behavior ?

EDIT: I am using scala 2.8.0.final

解决方案

Don't claim (apply) your Futures, since this forces them to block and wait for an answer; as you've seen this can lead to deadlocks. Instead, use them monadically to tell them what to do when they complete. Instead of:

val result1 = f1()
val result2 = f2()
merge(result1,result2)

Try this:

for {
  result1 <- f1
  result2 <- f2
} yield merge(result1, result2)

The result of this will be a Responder[Result] (essentially a Future[Result]) containing the merged results; you can do something effectful with this final value using respond() or foreach(), or you can map() or flatMap() it to another Responder[T]. No blocking necessary, just keep scheduling computations for the future!

Edit 1:

Ok, the signature of the compute function is going to have to change to Responder[Result] now, so how does that affect the recursive calls? Let's try this:

private def compute( input: Input ):Responder[Result] = {
  if( pairs.size < SIZE_LIMIT ) {
    future(computeSequential())
  } else {
    val (input1,input2) = input.split
    for {
      result1 <- compute(input1)
      result2 <- compute(input2)
    } yield merge(result1, result2)
  }
}

Now you no longer need to wrap the calls to compute with future(...) because they're already returning Responder (a superclass of Future).

Edit 2:

One upshot of using this continuation-passing style is that your top-level code--whatever calls compute originally--doesn't block at all any more. If it's being called from main(), and that's all the program does, this will be a problem, because now it will just spawn a bunch of futures and then immediately shut down, having finished everything it was told to do. What you need to do is block on all these futures, but only once, at the top level, and only on the results of all the computations, not any intermediate ones.

Unfortunately, this Responder thing that's being returned by compute() no longer has a blocking apply() method like the Future did. I'm not sure why flatMapping Futures produces a generic Responder instead of a Future; this seems like an API mistake. But in any case, you should be able to make your own:

def claim[A](r:Responder[A]):A = {
  import java.util.concurrent.ArrayBlockingQueue
  import scala.actors.Actor.actor

  val q = new ArrayBlockingQueue[A](1)
  // uses of 'respond' need to be wrapped in an actor or future block
  actor { r.respond(a => q.put(a)) } 
  return q.take
}

So now you can create a blocking call to compute in your main method like so:

val finalResult = claim(compute(input))

这篇关于应对期货的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆