Akka合并并发数据库请求 [英] Akka consolidate concurrent database requests

查看:93
本文介绍了Akka合并并发数据库请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望能够对多个数据存储库发出并发请求并合并结果。我试图了解我的方法是否完全有效,或者是否有更好的方法来解决此问题。我绝对是Akka / Spray / Scala的新手,我真的想更好地理解如何正确构建这些组件。任何建议/提示将不胜感激。试图用这种行为的执行者和期货来做我的头绪。

I want to be able to make concurrent requests to multiple data repositories and consolidate the results. I am trying to understand if my approach is at all valid or if there is a better way to approach this problem. I am definitely new to Akka / Spray / Scala and really want to get a better understanding of how to properly build these components. Any suggestions / Tips would be greatly appreciated. Trying to wrap my head around the use of actors and futures for this type of implementation.

喷雾服务:

trait DemoService extends HttpService with Actor with ActorLogging {
  implicit val timeout = Timeout(5 seconds) // needed for `?` below
  val mongoMasterActor = context.actorOf(Props[MongoMasterActor], "redisactor")
  val dbMaster = context.actorOf(Props[DbMasterActor], "dbactor")

  val messageApiRouting =
        path("summary" / Segment / Segment) { (dataset, timeslice) =>
          onComplete(getDbResponses(dbMaster, dataset, timeslice)) {
            case Success(dbMessageResponse) => complete(s"The result was $dbMessageResponse")
            case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}")
          }
        }

  /** Passes the desired actor reference for a specific dataset and timeslice for summary data retrieval
    *
    * @param mongoActor an actor reference to the RedisActor that will handle the appropriate request routing
    * @param dataset The dataset for which the summary has been requested
    * @param timeslice The timeslice (Month, Week, Day, etc.) for which the summary has been requested
    */
  def getSummary(mongoActor: ActorRef, dataset: String, timeslice: String): Future[DbMessageResponse] = {
    log.debug(s"dataset: $dataset  timeslice: $timeslice")
    val dbMessage = DbMessage("summary", dataset + timeslice)
    (mongoActor ? dbMessage).mapTo[DbMessageResponse]
  }

  def getDbResponses(dbActor: ActorRef, dataset: String, timeslice: String): Future[SummaryResponse] = {
    log.debug(s"dataset: $dataset  timeslice: $timeslice")
    val dbMessage = DbMessage("summary", dataset + timeslice)
    (dbActor ? dbMessage).mapTo[SummaryResponse]
  }

  def getSummaryPayload(mongoSummary: DbMessageResponse, redisSummary: DbMessageResponse): String = {
    mongoSummary.response + redisSummary.response
  }

}

Akka演员/未来模拟数据库请求:

class DbMasterActor extends Actor with ActorLogging {


  private var originalSender: ActorRef = _

  //TODO: Need to add routing to the config to limit instances
  val summaryActor = context.actorOf(Props(new SummaryActor), "summaryactor")

  def receive = {

    case msg: DbMessage => {
      this.originalSender = sender
      msg.query match {
        case "summary" => {

          getDbResults().onComplete{
            case Success(result) => originalSender ! result
            case Failure(ex) => log.error(ex.getMessage)

          }
        }
      }
    }

    //If not match log an error
    case _ => log.error("Received unknown message: {} ")

  }


  def getDbResults(): Future[SummaryResponse] = {
    log.debug("hitting db results")
    val mongoResult = Future{ Thread.sleep(500); "Mongo"}
    val redisResult = Future{ Thread.sleep(800); "redis"}

    for{
      mResult <- mongoResult
      rResult <- redisResult

    } yield SummaryResponse(mResult, rResult)

  }
}


推荐答案

继杰米·艾伦(Jamie Allen)读了《有效的阿卡》之后,我将尝试应用他的客串模式建议。

Following the reading of Effective Akka by Jamie Allen, I am going to attempt to apply his "Cameo" pattern suggestion.

Slideshare:
http://www.slideshare.net/shinolajla/effective-akka-scalaio

Slideshare: http://www.slideshare.net/shinolajla/effective-akka-scalaio

Github:
https://github.com/jamie-allen/effective_akka

I认为我创建的内容会起作用,但听起来并不像杰米在演讲中提出的最佳方法。我将更新/编辑回到我已经实现(或尝试)的内容。

I think what I created will work, but doesn't sound like the best approach based on Jamie's comments in his talks. I will update / edit back to this post what I have implemented (or try to).

摘要演员(浮雕演员):

object SummaryResponseHandler {
  case object DbRetrievalTimeout

  def props(mongoDb: ActorRef, redisDb: ActorRef, originalSender: ActorRef): Props = {
    Props(new SummaryResponseHandler(mongoDb, redisDb, originalSender))
  }
}

class SummaryResponseHandler(mongoDb: ActorRef, redisDb: ActorRef,
                             originalSender: ActorRef) extends Actor with ActorLogging {

  import SummaryResponseHandler._
  var mongoSummary, redisSummary: Option[String] = None
  def receive = LoggingReceive {
    case MongoSummary(summary) =>
      log.debug(s"Received mongo summary: $summary")
      mongoSummary = summary
      collectSummaries
    case RedisSummary(summary) =>
      log.debug(s"Received redis summary: $summary")
      redisSummary = summary
      collectSummaries
    case DbRetrievalTimeout =>
      log.debug("Timeout occurred")
      sendResponseAndShutdown(DbRetrievalTimeout)
  }

  def collectSummaries = (mongoSummary, redisSummary) match {
    case (Some(m), Some(r)) =>
      log.debug(s"Values received for both databases")
      timeoutMessager.cancel
      sendResponseAndShutdown(DataSetSummary(mongoSummary, redisSummary))
    case _ =>
  }

  def sendResponseAndShutdown(response: Any) = {
    originalSender ! response
    log.debug("Stopping context capturing actor")
    context.stop(self)
  }

  import context.dispatcher
  val timeoutMessager = context.system.scheduler.scheduleOnce(
    250 milliseconds, self, DbRetrievalTimeout)
}

class SummaryRetriever(mongoDb: ActorRef, redisDb: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case GetSummary(dataSet) =>
      log.debug("received dataSet")
      val originalSender = sender
      val handler = context.actorOf(SummaryResponseHandler.props(mongoDb,redisDb, originalSender), "cameo-message-handler")
      mongoDb.tell(GetSummary(dataSet), handler)
      redisDb.tell(GetSummary(dataSet), handler)
    case _ => log.debug(s"Unknown result $GetSummary(datset)")
  }

}

公用:

case class GetSummary(dataSet: String)
case class DataSetSummary(
   mongo: Option[String],
   redis: Option[String]
)

case class MongoSummary(
    summary: Option[String]
                         )

case class RedisSummary(
   summary: Option[String]
                         )

trait MongoProxy extends Actor with ActorLogging
trait RedisProxy extends Actor with ActorLogging

模拟存根:

class MongoProxyStub extends RedisProxy {
  val summaryData = Map[String, String](
    "dataset1" -> "MongoData1",
    "dataset2" -> "MongoData2")

  def receive = LoggingReceive {
    case GetSummary(dataSet: String) =>
      log.debug(s"Received GetSummary for ID: $dataSet")
      summaryData.get(dataSet) match {
        case Some(data) => sender ! MongoSummary(Some(data))
        case None => sender ! MongoSummary(Some(""))
      }
  }
}

class RedisProxyStub extends MongoProxy{
  val summaryData = Map[String, String](
    "dataset1" -> "RedisData1",
    "dataset2" -> "RedisData2")

  def receive = LoggingReceive {
    case GetSummary(dataSet: String) =>
      log.debug(s"Received GetSummary for ID: $dataSet")
      summaryData.get(dataSet) match {
        case Some(data) => sender ! RedisSummary(Some(data))
        case None => sender ! RedisSummary(Some(""))
      }
  }
}

引导(您应该使用测试,但只是想从引导运行):

object Boot extends App{

  val system = ActorSystem("DbSystem")

  val redisProxy = system.actorOf(Props[RedisProxyStub], "cameo-success-mongo")
  val mongoProxy = system.actorOf(Props[MongoProxyStub], "cameo-success-redis")
  val summaryRetrieverActor = system.actorOf(Props(new SummaryRetriever(redisProxy, mongoProxy)), "cameo-retriever1")

  implicit val timeout = Timeout(5 seconds)
  val future = summaryRetrieverActor ? GetSummary("dataset1")
  val result = Await.result(future, timeout.duration).asInstanceOf[DataSetSummary]
  println(Some(result.mongo).x)
  println(result.redis)

  system.shutdown()

}

应用程序配置:

akka.loglevel = "DEBUG"
akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
akka.actor.debug.autoreceive = on
akka.actor.debug.lifecycle = on
akka.actor.debug.receive = on
akka.actor.debug.event-stream = on

这篇关于Akka合并并发数据库请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆