Akka演员中的增量处理 [英] Incremental processing in an akka actor

查看:82
本文介绍了Akka演员中的增量处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些演员需要做很长时间并且计算量很大的工作,但是计算本身可以逐步完成。因此,虽然完整的计算本身需要花费数小时才能完成,但是中间结果实际上非常有用,我希望能够响应它们的任何请求。这是我想做的伪代码:

I have actors that need to do very long-running and computationally expensive work, but the computation itself can be done incrementally. So while the complete computation itself takes hours to complete, the intermediate results are actually extremely useful, and I'd like to be able to respond to any requests of them. This is the pseudo code of what I want to do:

var intermediateResult = ...
loop {
     while (mailbox.isEmpty && computationNotFinished)
       intermediateResult = computationStep(intermediateResult)


     receive {
         case GetCurrentResult => sender ! intermediateResult
         ...other messages...
     }
 }


推荐答案

从您对Roland Kuhn答案的评论中,我认为您有一些工作(至少在块中)可以被认为是递归的。如果不是这种情况,我认为不可能有任何干净的解决方案来解决您的问题,而您将不得不处理复杂的模式匹配块。

I assume from your comment to Roland Kuhn answer that you have some work which can be considered as recursive, at least in blocks. If this is not the case, I don't think there could be any clean solution to handle your problem and you will have to deal with complicated pattern matching blocks.

如果我的假设是正确的,我将异步调度计算,并让参与者自由地回答其他消息。关键是要使用Future单声道功能并具有简单的接收块。您将必须处理三个消息(startComputation,changeState,getState)

If my assumptions are correct, I would schedule the computation asynchronously and let the actor be free to answer other messages. The key point is to use Future monadic capabilities and having a simple receive block. You would have to handle three messages (startComputation, changeState, getState)

最终将收到以下消息:

def receive {
  case StartComputation(myData) =>expensiveStuff(myData)
  case ChangeState(newstate) = this.state = newstate
  case GetState => sender ! this.state
}

然后您可以在Future上利用map方法,方法是定义您自己的递归映射:

And then you can leverage the map method on Future, by defining your own recursive map:

 def mapRecursive[A](f:Future[A], handler: A => A, exitConditions: A => Boolean):Future[A] = {
    f.flatMap {  a=>
                 if (exitConditions(a))
                   f
                 else {
                     val newFuture = f.flatMap{ a=> Future(handler(a))}
                     mapRecursive(newFuture,handler,exitConditions)
                 }

              }
  }

有了此工具,一切都会变得更加容易。如果您看下面的示例:

Once you have this tool, everything is easier. If you look to the following example :

def main(args:Array[String]){
    val baseFuture:Future[Int] = Promise.successful(64)
    val newFuture:Future[Int] = mapRecursive(baseFuture,
                                 (a:Int) => {
                                   val result = a/2
                                   println("Additional step done: the current a is " + result)
                                   result
                                 }, (a:Int) => (a<=1))

    val one = Await.result(newFuture,Duration.Inf)
    println("Computation finished, result = " + one)



  }

其输出为:


已完成其他步骤:当前a为32

Additional step done: the current a is 32

已完成其他步骤:当前a为16

Additional step done: the current a is 16

已完成其他步骤:当前a为8

Additional step done: the current a is 8

已完成其他步骤:当前租金a是4

Additional step done: the current a is 4

已完成其他步骤:当前a是2

Additional step done: the current a is 2

已完成其他步骤:当前a是1

Additional step done: the current a is 1

计算完成,结果= 1

Computation finished, result = 1

您了解可以在您的 expensiveStuff 方法

  def expensiveStuff(myData:MyData):Future[MyData]= {
    val firstResult = Promise.successful(myData)
    val handler : MyData => MyData = (myData) => {
      val result = myData.copy(myData.value/2)
      self ! ChangeState(result)
      result
    }
    val exitCondition : MyData => Boolean = (myData:MyData) => myData.value==1
    mapRecursive(firstResult,handler,exitCondition)
  }



< hr>

编辑-更详细

如果您不想阻止Actor,它以线程安全和同步的方式处理其邮箱中的消息,您唯一能做的就是让计算在不同的线程上执行。这正是高性能的非阻塞接收。

If you don't want to block the Actor, which processes messages from its mailbox in a thread-safe and synchronous manner, the only thing you can do is to get the computation executed on a different thread. This is exactly an high performance non blocking receive.

但是,您说的对,我建议的方法会带来很高的性能损失。每个步骤都是在一个不同的未来上完成的,这可能根本没有必要。因此,您可以递归处理程序以获得单线程或多线程执行。毕竟没有神奇的公式:

However, you were right in saying that the approach I propose pays a high performance penalty. Every step is done on a different future, which might be not necessary at all. You can therefore recurse the handler to obtain a single-threaded or multiple-threaded execution. There is no magic formula after all:


  • 如果要异步调度并最大程度地降低成本,则所有工作都应由一个人完成线程

  • 但是这可能阻止其他工作开始,因为如果线程池中的所有线程都被占用,则期货将排队。因此,您可能希望将操作分解为多个期货,以便即使在充分使用后也可以在完成旧工作之前安排一些新工作。

def recurseFuture[A](entryFuture: Future[A], handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long = Long.MaxValue): Future[A] = {
        def recurse(a:A, handler: A => A, exitCondition: A => Boolean, maxNestedRecursion: Long, currentStep: Long): Future[A] = {
          if (exitCondition(a))
            Promise.successful(a)
          else
            if (currentStep==maxNestedRecursion)
              Promise.successful(handler(a)).flatMap(a => recurse(a,handler,exitCondition,maxNestedRecursion,0))
            else{
              recurse(handler(a),handler,exitCondition,maxNestedRecursion,currentStep+1)
            }
        }
        entryFuture.flatMap { a => recurse(a,handler,exitCondition,maxNestedRecursion,0) }
      }

我增强了出于测试目的,我的处理程序方法是

I have enhanced for testing purposes my handler method:

  val handler: Int => Int = (a: Int) => {
      val result = a / 2
      println("Additional step done: the current a is " + result + " on thread " + Thread.currentThread().getName)
      result
    }

方法1:自己递归处理程序,以便在a上执行所有操作

    println("Starting strategy with all the steps on the same thread")
    val deepestRecursion: Future[Int] = recurseFuture(baseFuture,handler, exitCondition)
    Await.result(deepestRecursion, Duration.Inf)
    println("Completed strategy with all the steps on the same thread")
    println("")

方法2:递归处理程序有限的深度

println("Starting strategy with the steps grouped by three")
val threeStepsInSameFuture: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,3)
val threeStepsInSameFuture2: Future[Int] = recurseFuture(baseFuture,handler, exitCondition,4)
Await.result(threeStepsInSameFuture, Duration.Inf)
Await.result(threeStepsInSameFuture2, Duration.Inf)
println("Completed strategy with all the steps grouped by three")
executorService.shutdown()

这篇关于Akka演员中的增量处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆