Scala未来处理深度优先而不是广度优先 [英] scala Future processing depth-first not breadth-first

查看:64
本文介绍了Scala未来处理深度优先而不是广度优先的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我大致基于以下模式进行了大量计算:

I have a large computation roughly based on the following pattern :

def f1(i:Int):Int = ???
def f2(i:Int):Int = ???

def processA(l: List[Int]) = 
  l.map(i => Future(f1(i)))

def processB(l: List[Int]) = {
  val p = processA(l)
  p.map(fut => fut.map(f2))
}

def main() = {
  val items = List( /* 1k to 10k items here */ )
  val results = processB(items)
  results.map(_.onComplete ( ... ))
}

如果我的理解是正确的,我遇到的问题是处理是广度优先的. ProcessA启动成千上万的期货,然后processB将排队处理成千上万的新Future,这些新Future将在processA的期货完成之后进行处理. onComplete回调将开始触发得很晚...

The problem I encounter, if my understanding is correct, is that the processing is breadth-first. ProcessA starts thousands of Futures, and processB will then enqueue thousands of new Futures that will be processed after those of processA are finished. The onComplete callback will start to be fired very late...

我想先谈谈深度:processA的Futures很少启动,然后,processB从那里继续,而不是切换到队列中的其他东西.

I would like to turn this depth-first : few Futures of processA starts, and then, processB continues from there instead of switching to something else in queues.

可以在香草scala中完成吗?我是否应该使用一些替代Futures()和ThreadPools的库?

Can it be done in vanilla scala ? Should I turn to some lib with an alternative to Futures() and ThreadPools ?

编辑:更多细节.正如答案中所建议的那样,目前无法将其重写为f1 andThen f2.实际上,processA and B正在做很多其他事情(包括副作用).而processB依赖于ProcessA的事实是私有的.如果暴露了SoC,它将破坏SoC.

EDIT: a bit more detail. Rewriting into f1 andThen f2, as it has been suggested in answers, is not practicable at this time. Actually, processA and B are doing a bunch of other things (incl. side effect). And the fact that processB relies on ProcessA is private. It would break SoC if it's exposed.

编辑2 :我想我将放松一些香草"约束.有人建议Akka串流会有所帮助.我目前正在查看scalaz.Task:有意见的人吗?

EDIT 2: I think I'm going to relax a bit the "vanilla" constraint. Someone suggested Akka streams that would help. I'm currently having a look at scalaz.Task: an opinion anyone ?

推荐答案

我不确定100%是否理解这个问题,因为processB(f2)运行在processA(f1)的结果之上,因此您无法调用关于尚未由f1计算的值,因此我的答案基于以下假设:

I wasn't 100% sure I understood the question, since processB (f2) runs on top of the results of processA (f1) you cannot call f2 on values which have not been computed by f1 yet, so my answer is based on the assumption that:

  • 您要限制正在进行的工作
  • 您要在f1之后立即执行f2
  • You want to limit work-in-progress
  • You want to execute f2 immediately after f1

因此,这是一种解决方案:

So here's one solution to that:

import scala.concurrent._
def process(noAtATime: Int, l: List[Int])(transform: Int => Int)(implicit ec: ExecutionContext): Future[List[Int]] = {
  // define an inner async "loop" to process one chunk of numbers at a time
  def batched(i: Future[Iterator[List[Int]]], result: List[List[Int]]): Future[List[Int]] =
    i flatMap { it =>
      // if there are more chunks to process
      // we process all numbers in the chunk as parallel as possible,
      // then combine the results into a List again, then when all are done,
      // we recurse via flatMap+batched with the iterator
      // when we have no chunks left, then we un-chunk the results
      // reassemble it into the original order and return the result
      if(it.hasNext) Future.traverse(it.next)(n => Future(transform(n))).flatMap(re => batched(i, re :: result))
      else Future.successful(result.reverse.flatten) // Optimize this as needed
    }
  // Start the async "loop" over chunks of input and with an empty result
  batched(Future.successful(l.grouped(noAtATime)), List.empty)
}


scala> def f1(i: Int) = i * 2 // Dummy impl to prove it works
f1: (i: Int)Int

scala> def f2(i: Int) = i + 1 // Dummy impl to prove it works
f2: (i: Int)Int

scala> process(noAtATime = 100, (1 to 10000).toList)(n => f2(f1(n)))(ExecutionContext.global)
res0: scala.concurrent.Future[List[Int]] = Future(<not completed>)

scala> res0.foreach(println)(ExecutionContext.global)

scala> List(3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57, 59, 61, 63, 65, 67, 69, 71, 73, 75, 77, 79, 81, 83, 85, 87, 89, 91, 93, 95, 97, 99, 101, 103, 105, 107, 109, 111, 113, 115, 117, 119 …

如果您愿意并且能够使用一个更适合当前问题的库,请查看此答复

If you are willing and able to use a library which is better suited for the problem at hand, have a look at this reply

这篇关于Scala未来处理深度优先而不是广度优先的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆