在Akka中等待多个结果 [英] Waiting for multiple results in Akka

查看:98
本文介绍了在Akka中等待多个结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Akka中等待多个参与者结果的正确方法是什么?

What is the proper way to wait for the result of multiple actors in Akka?

反应式编程原理 Coursera课程进行了具有复制键值存储的练习。在不涉及分配细节的情况下,它需要等待多个参与者的确认,然后才能表明复制已完成。

The Principles of Reactive Programming Coursera course had an exercise with a replicated key-value store. Without going into the details of the assignment, it required waiting on the acknowledgement of multiple actors before it could indicate the replication was complete.

我使用包含未完成请求的可变地图实现了任务,但是我觉得解决方案有难闻的气味。我希望有一种更好的方法来实现看似常见的情况。

I implemented the assignment using a mutable map containing the outstanding requests, but I felt the solution had a 'bad smell'. I hoped there was a better way to implement what seems like a common scenario.

为了通过保留我对练习的解决方案来维护班级荣誉代码,我有一个抽象用例来描述类似的问题。

In an attempt to uphold the classes' honor code by withholding my solution to the exercise, I have an abstract use case that describes a similar problem.

发票行项目需要计算其应纳税额。纳税义务是跨多个税务部门(例如,联邦,州,警察局)应用于该订单项的所有税项的组合。如果每个税务部门都是能够确定行项目的应纳税额的行为者,则该行项目需要所有行为者进行报告,然后才能继续报告总体应纳税额。在Akka中完成此方案的最佳/正确方法是什么?

An invoice line item needs to calculate its tax liability. The tax liability is combination of all the taxes applied to the line item across multiple taxing authorities (e.g., federal, state, police district). If each taxing authority was an actor capable of determining the tax liability of the line item, the line item would need all actors to report before it could continue report the overall tax liability. What is the best/right way to accomplish this scenario in Akka?

推荐答案

我相信您正在寻找的简化示例。它显示了像演员这样的主人如何催生一些童工,然后等待他们的所有响应,处理可能发生超时等待结果的情况。该解决方案显示了如何等待初始请求,然后在等待响应时切换到新的接收功能。它还显示了如何将状态传播到等待的接收函数中,以避免必须在实例级别具有显式的可变状态。

Here is a simplified example of what I believe you are looking for. It shows how a master like actor spawns some child workers and then waits for all of their responses, handling the situation where a timeout can occur waiting for results. The solution shows how to wait for an initial request and then switch over to a new receive function when waiting for the responses. It also shows how to propagate state into the waiting receive function to avoid having to have explicit mutable state at the instance level.

object TaxCalculator {
  sealed trait TaxType
  case object StateTax extends TaxType
  case object FederalTax extends TaxType
  case object PoliceDistrictTax extends TaxType
  val AllTaxTypes:Set[TaxType] = Set(StateTax, FederalTax, PoliceDistrictTax)

  case class GetTaxAmount(grossEarnings:Double)
  case class TaxResult(taxType:TaxType, amount:Double)  

  case class TotalTaxResult(taxAmount:Double)
  case object TaxCalculationTimeout
}

class TaxCalculator extends Actor{
 import TaxCalculator._
 import context._
 import concurrent.duration._

  def receive =  waitingForRequest

  def waitingForRequest:Receive = {
    case gta:GetTaxAmount =>
      val children = AllTaxTypes map (tt => actorOf(propsFor(tt)))
      children foreach (_ ! gta)
      setReceiveTimeout(2 seconds)
      become(waitingForResponses(sender, AllTaxTypes))
  }

  def waitingForResponses(respondTo:ActorRef, expectedTypes:Set[TaxType], taxes:Map[TaxType, Double] = Map.empty):Receive = {
    case TaxResult(tt, amount) =>
      val newTaxes = taxes ++ Map(tt -> amount)
      if (newTaxes.keySet == expectedTypes){
        respondTo ! TotalTaxResult(newTaxes.values.foldLeft(0.0)(_+_))
        context stop self
      }
      else{
        become(waitingForResponses(respondTo, expectedTypes, newTaxes))
      }

    case ReceiveTimeout =>
      respondTo ! TaxCalculationTimeout
      context stop self
  }

  def propsFor(taxType:TaxType) = taxType match{
    case StateTax => Props[StateTaxCalculator]
    case FederalTax => Props[FederalTaxCalculator]
    case PoliceDistrictTax => Props[PoliceDistrictTaxCalculator]
  }  
}

trait TaxCalculatingActor extends Actor{  
  import TaxCalculator._
  val taxType:TaxType
  val percentage:Double

  def receive = {
    case GetTaxAmount(earnings) => 
      val tax = earnings * percentage
      sender ! TaxResult(taxType, tax)
  }
}

class FederalTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.FederalTax
  val percentage = 0.20
}

class StateTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.StateTax
  val percentage = 0.10
}

class PoliceDistrictTaxCalculator extends TaxCalculatingActor{
  val taxType = TaxCalculator.PoliceDistrictTax
  val percentage = 0.05
}

然后您可以使用以下代码对其进行测试:

Then you could test this out with the following code:

import TaxCalculator._
import akka.pattern.ask
import concurrent.duration._
implicit val timeout = Timeout(5 seconds)

val system = ActorSystem("taxes")
import system._
val cal = system.actorOf(Props[TaxCalculator])
val fut = cal ? GetTaxAmount(1000.00)
fut onComplete{
  case util.Success(TotalTaxResult(amount)) =>
    println(s"Got tax total of $amount")
  case util.Success(TaxCalculationTimeout) =>
    println("Got timeout calculating tax")
  case util.Failure(ex) => 
    println(s"Got exception calculating tax: ${ex.getMessage}")
}

这篇关于在Akka中等待多个结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆