在Akka中同时使用严格和流式WebSocket消息 [英] Consuming both Strict and Streamed WebSocket Messages in Akka

查看:204
本文介绍了在Akka中同时使用严格和流式WebSocket消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Akka HTTP构建Web套接字服务。我需要处理全部到达的严格消息,以及处理在m个多帧中到达的流式消息。我使用带有handleWebSocketMessages()的路由将Web套接字的处理传递给流。我的代码看起来像这样:

I am experimenting with building a web socket service using Akka HTTP. I need to handle Strict messages that arrive in totality, as well as handle Streamed messages that arrive in m multiple frames. I am using a route with handleWebSocketMessages() to pass the handling of web sockets off to a flow. The code I have looks something like this:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ msg
    case TextMessage.Streamed(stream) => ??? // <= What to do here??
  }
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[String, String, Any] = {
  // Set Up Actors
  // ... (this is working)
  Flow.fromSinkAndSource(in, out)
}

我不太确定两个如何处理严格消息和流消息。我意识到我可以做这样的事情:

I am not really sure how two handle both Strict and Streamed messages. I realize I could do something like this :

  .collect {
    case TextMessage.Strict(msg) ⇒ Future.successful(msg)
    case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
  }

但是现在我的流必须处理Future [String]而不是Strings,我不确定该如何处理,尤其是因为显然我需要按顺序处理消息。

But now my stream has to handle Future[String] rather than just Strings, which I am then not sure how to handle, especially since obviously I need to handle messages in order.

看到这个akka问题,这似乎有些相关,但不完全是我所需要的(我不认为?)。

I did see this akka issue, which seems to be somewhat related, but not exactly what I need (I don't think?).

https://github.com/akka/akka/issues/20096

将提供任何帮助

推荐答案

基于以下内容的最终答案(感谢svezfaz)最终成为像这样的东西:

The ultimate answer based on the below (thanks to svezfaz) answer turned out to be something like this:

val route: Route =
  get {
    handleWebSocketMessages(createFlow())
  }

def createFlow(): Flow[Message, Message, Any] = Flow[Message]
  .collect {
    case TextMessage.Strict(msg) ⇒ 
      Future.successful(MyCaseClass(msg))
    case TextMessage.Streamed(stream) => stream
      .limit(100)                   // Max frames we are willing to wait for
      .completionTimeout(5 seconds) // Max time until last frame
      .runFold("")(_ + _)           // Merges the frames
      .flatMap(msg => Future.successful(MyCaseClass(msg)))
  }
  .mapAsync(parallelism = 3)(identity)
  .via(createActorFlow())
  .map {
    case msg: String ⇒ TextMessage.Strict(msg)
  }

def createActorFlow(): Flow[MyCaseClass, String, Any] = {
  // Set Up Actors as source and sink (not shown)
  Flow.fromSinkAndSource(in, out)
}

这篇关于在Akka中同时使用严格和流式WebSocket消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆