在Akka中同时使用严格和流式WebSocket消息 [英] Consuming both Strict and Streamed WebSocket Messages in Akka
问题描述
我正在尝试使用Akka HTTP构建Web套接字服务。我需要处理全部到达的严格消息,以及处理在m个多帧中到达的流式消息。我使用带有handleWebSocketMessages()的路由将Web套接字的处理传递给流。我的代码看起来像这样:
I am experimenting with building a web socket service using Akka HTTP. I need to handle Strict messages that arrive in totality, as well as handle Streamed messages that arrive in m multiple frames. I am using a route with handleWebSocketMessages() to pass the handling of web sockets off to a flow. The code I have looks something like this:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒ msg
case TextMessage.Streamed(stream) => ??? // <= What to do here??
}
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[String, String, Any] = {
// Set Up Actors
// ... (this is working)
Flow.fromSinkAndSource(in, out)
}
我不太确定两个如何处理严格消息和流消息。我意识到我可以做这样的事情:
I am not really sure how two handle both Strict and Streamed messages. I realize I could do something like this :
.collect {
case TextMessage.Strict(msg) ⇒ Future.successful(msg)
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _)
}
但是现在我的流必须处理Future [String]而不是Strings,我不确定该如何处理,尤其是因为显然我需要按顺序处理消息。
But now my stream has to handle Future[String] rather than just Strings, which I am then not sure how to handle, especially since obviously I need to handle messages in order.
看到这个akka问题,这似乎有些相关,但不完全是我所需要的(我不认为?)。
I did see this akka issue, which seems to be somewhat related, but not exactly what I need (I don't think?).
https://github.com/akka/akka/issues/20096
将提供任何帮助
推荐答案
基于以下内容的最终答案(感谢svezfaz)最终成为像这样的东西:
The ultimate answer based on the below (thanks to svezfaz) answer turned out to be something like this:
val route: Route =
get {
handleWebSocketMessages(createFlow())
}
def createFlow(): Flow[Message, Message, Any] = Flow[Message]
.collect {
case TextMessage.Strict(msg) ⇒
Future.successful(MyCaseClass(msg))
case TextMessage.Streamed(stream) => stream
.limit(100) // Max frames we are willing to wait for
.completionTimeout(5 seconds) // Max time until last frame
.runFold("")(_ + _) // Merges the frames
.flatMap(msg => Future.successful(MyCaseClass(msg)))
}
.mapAsync(parallelism = 3)(identity)
.via(createActorFlow())
.map {
case msg: String ⇒ TextMessage.Strict(msg)
}
def createActorFlow(): Flow[MyCaseClass, String, Any] = {
// Set Up Actors as source and sink (not shown)
Flow.fromSinkAndSource(in, out)
}
这篇关于在Akka中同时使用严格和流式WebSocket消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!