如何在 akka http 路由中处理长时间运行的请求 [英] how to handle long running request in akka http route

查看:31
本文介绍了如何在 akka http 路由中处理长时间运行的请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 akka http 我的路由之一是通过 akka http 客户端 api 与外部服务交互,并且 httpRequest 持续运行我无法使其工作这是我的用例->我正在与 janus 服务器进行交互,并在服务器以keepAlive"或事件"进行响应后立即执行长轮询获取请求.我再次请求,因此服务器不断响应

i am using akka http one of my routes is interacting with an external service via akka http client side api and the httpRequest is continuously running i am unable to make it work here is my use case -> i am interacting with a janus server and doing a long poll get request as soon as the server responded back with an 'keepAlive' or an "event" i am requesting again and so on the server keeps on responding

所有这一切都发生在一个演员内部,我有一个 akka htttp 路由,它正在初始化第一个请求

all of this is happening inside an actor and i have an akka htttp route which is intiailising the first request

这是我的代码

final case class CreateLongPollRequest(sessionId:BigInt)
class LongPollRequestActor (config: Config) extends Actor {
  def receive = {
    case CreateLongPollRequest(sessionId) =>
      senderRef = Some(sender())
      val uri: String = "localhost:8080/" + sessionId
      val request = HttpRequest(HttpMethods.GET, uri)
      val responseFuture = Http(context.system).singleRequest(request)
      responseFuture
        .onComplete {
          case Success(res)
          Unmarshal(res.entity.toStrict(40 seconds)).value.map { result =>
              val responseStr = result.data.utf8String
              log.info("Actor LongPollRequestActor: long poll responseStr {}",responseStr)
              senderRef match {
                case Some(ref) =>
                  ref ! responseStr
                case None => log.info("Actor LongPollRequestActor: sender ref is null")
              }
            }
          case Failure(e) =>log.error(e)
          }
          }
          }


final case class JanusLongPollRequest(sessionId: BigInt)
class JanusManagerActor(childMaker: List[ActorRefFactory => ActorRef]) extends Actor {
  var senderRef: Option[akka.actor.ActorRef] = None
  val longPollRequestActor  = childMaker(1)(context)
  def receive: PartialFunction[Any, Unit] = {
    case JanusLongPollRequest(sessionId)=>
      senderRef = Some(sender)
      keepAlive(sessionId,senderRef)
}

  

    def keepAlive(sessionId:BigInt,sender__Ref: Option[ActorRef]):Unit= {
        val senderRef = sender__Ref
        val future = ask(longPollRequestActor, CreateLongPollRequest(sessionId)).mapTo[String] //.pipeTo(sender)
    if (janus.equals("keepalive")) {
                val janusRequestResponse = Future {
                  JanusSessionRequestResponse(janus = janus)
                }
                senderRef match {
                  case Some(sender_ref) =>
                    janusRequestResponse.pipeTo(sender_ref)
                }
                keepAlive(sessionId,senderRef)
              }
              else if (janus.equals("event")) {
               //some fetching of values from server 
                val janusLongPollRequestResponse = Future {
                  JanusLongPollRequestResponse(janus = janus,sender=sender, transaction=transaction,pluginData=Some(pluginData))
                }
                senderRef match {
                  case Some(sender_ref) =>
                    janusLongPollRequestResponse.pipeTo(sender_ref)
                }
                keepAlive(sessionId,senderRef)
              }



def createLongPollRequest: server.Route =
    path("create-long-poll-request") {
      post {
entity(as[JsValue]) {
          json =>
            val sessionID = json.asJsObject.fields("sessionID").convertTo[String]
        
          val future = ask(janusManagerActor, JanusLongPollRequest(sessionID)).mapTo[JanusSessionRequestResponse]
            onComplete(future) {
              case Success(sessionDetails) =>
                    log.info("janus long poll request created")
                    val jsonResponse = JsObject("longpollDetails" -> sessionDetails.toJson)
                    complete(OK, routeResponseMessage.getResponse(StatusCodes.OK.intValue, ServerMessages.JANUS_SESSION_CREATED, jsonResponse))
                
              case Failure(ex) =>
                failWith(ex)
            }
  
           }
          } 

现在上面的路由 createLongPollRequest 工作正常,我第一次可以看到响应,而在接下来的尝试中,我收到了如下的死信

now the above route createLongPollRequest worked fine for the first time I can see the response and for the next attempts i am getting a dead letter as follows

[INFO] [akkaDeadLetter][07/30/2021 12:13:53.587] [demo-Janus-ActorSystem-akka.actor.default-dispatcher-6] [akka://demo-Janus-ActorSystem/deadLetters] Message [com.ifkaar.lufz.janus.models.janus.JanusSessionRequestResponse] from Actor[akka://demo-Janus-ActorSystem/user/ActorManager/ManagerActor#-721316187] to Actor[akka://demo-Janus-ActorSystem/deadLetters] was not delivered. [4] dead letters encountered. If this is not an expected behavior then Actor[akka://demo-Janus-ActorSystem/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

可能是第一次迭代后出现问题responseFuture.pipeTo(sender()

probably this is causing the issue after the first iteration responseFuture.pipeTo(sender()

有没有办法在我的后端服务器响应时在我的 akkahttp 路由中获得响应?

IS there a way where i can get a response in my akkahttp route when ever my backend server responds?

推荐答案

Actor 应该只回复一次 CreateLongPollRequest 并且只有在它有有效数据时才应该这样做.如果投票失败,Actor 应该发出另一个投票请求.

The Actor should only reply once to the CreateLongPollRequest and it should only do this when it has valid data. If the poll fails the Actor should just issue another poll request.

如果没有演员的详细信息,很难提供更多帮助.

It is difficult to give more help without the details of the Actor.

这篇关于如何在 akka http 路由中处理长时间运行的请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆