如何与Akka演员解决这个问题? [英] How to solve this with Akka actors?

查看:89
本文介绍了如何与Akka演员解决这个问题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

不知道如何命名该线程,但将尝试在几行中解释该问题。

didn't know how to name this thread but will try to explain the problem in few lines.

我有一条命令需要计算所需日期的价格范围。为了计算它,系统需要分别获取每天的价格(数据库,配置,缓存,与从何处无关)。

I have a command which need to calculate price for desired date range. To calculate it system needs to fetch the price for every day individually (DB, config, cache, it doesn't matter from where).

我的建议是拥有一个PriceRangeActor将拥有一个DailyPriceActors池,并将向他们发送CalculateDailyPrice之类的命令。

My suggestion was to have one PriceRangeActor which will have a pool of DailyPriceActors and will send them commands like CalculateDailyPrice.

但是如何在PriceRanceActor中组装所有这些数据?

But how to assemble all that data in PriceRanceActor?

1。
拥有一些带有复杂键的大地图会闻到很多东西。然后如何确定范围是否已完全计算?有没有更简便的方法?

1. Having some big map with complex keys just smells a lot. How to determine then if the range is completely calculated? Is there any easier way of doing this?

2。
为每个命令创建新的PriceRangeActor并使用Ask模式查询DailyPriceActors列表吗?

2. Create new PriceRangeActor for every command and use ask pattern to query the list of DailyPriceActors?

推荐答案

因为您不在?利用任何消息传递/排队,我建议使用Future而不是Actors作为您的并发抽象机制。 博客条目提出了一个非常有说服力的论点,即参与者是针对国家的,而期货是针对国家的。

Because you aren't utilizing any message passing/queuing I suggest Futures rather than Actors as your concurrency abstraction mechanism. This blog entry makes a very compelling argument that Actors are for state and Futures are for computation.

对于Futures或Actor (这是Future),您可以使用 Future.sequence 将所有单独的查询期货捆绑在一起,成为一个仅在所有子查询完成后才完成的期货。

With either Futures or Actor ? (which is a Future) you can use Future.sequence to bundle together all of the separate querying Futures into a single Future that only completes once all the sub-queries are complete.

使用期货(推荐)

import scala.concurrent.Future

object Foo extends App {

  type Date = Int
  type Prices = Seq[Float]
  type PriceMap = Map[Date, Prices]

  //expensive query function
  def fetchPrices(date : Date) : Prices = ???

  //the Dates to query Prices for
  val datesToQuery : Seq[Date] = ???

  import scala.concurrent.ExecutionContext.Implicits._

  def concurrentQuery(date : Date) : Future[Prices] = Future {fetchPrices(date)}      

  //launches a Future per date query, D Dates => D Futures
  //Future.sequence converts the D Futures into 1 Future
  val dates2PricesFuture : Future[PriceMap] = 
    Future.sequence(datesToQuery map concurrentQuery)
          .map(datesToQuery zip _)  
          .map(_.toMap)      

  dates2PricesFuture onSuccess { case priceMap : PriceMap =>
    //process the price data which is now completely available
  }

}//end object Foo

使用方法

import scala.concurrent.Future
import akka.actor.{Actor, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout

object Foo extends App {          
  type Date = Int
  type Prices = Seq[Float]
  type PriceMap = Map[Date, Prices]

  def fetchPrices(date : Date) : Prices = ???

  val datesToQuery : Seq[Date] = ???

  class QueryActor() extends Actor {
    def receive = { case date : Date => sender ! fetchPrices(date) }
  }

  implicit val as = ActorSystem()
  implicit val queryTimeout = Timeout(1000)

  import as.dispatcher

  def concurrentQuery(date : Date) : Future[Prices] =    
    ask(as actorOf Props[QueryActor],date).mapTo[Prices]

  val dates2PricesFuture : Future[PriceMap] = 
        Future.sequence(datesToQuery map concurrentQuery)
              .map(datesToQuery zip _)  
              .map(_.toMap)

  dates2PricesFuture onSuccess ... //same as first example

}//end object Foo

这篇关于如何与Akka演员解决这个问题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆