从服务器关闭 akka-http websocket 连接 [英] Close akka-http websocket connection from server

查看:29
本文介绍了从服务器关闭 akka-http websocket 连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的场景中,客户端发送再见"websocket 消息,我需要关闭之前在服务器端建立的连接.

In my scenario, a client sends "goodbye" websocket message and I need to close previously established connection at the server side.

来自 akka-http 文档:

From akka-http docs:

通过取消来自服务器逻辑的传入连接流(例如,通过将其下游连接到 Sink.cancelled 并将其上游连接到 Source.empty),可以关闭连接.也可以通过取消 IncomingConnection 源连接来关闭服务器的套接字.

Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server's socket by cancelling the IncomingConnection source connections.

但考虑到 SinkSource 在协商新连接时设置一次,我不清楚如何做到这一点:

But it's not clear to me how to do that taking into account that Sink and Source are set once when negotiating a new connection:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ⇒
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ⇒
      reject(ExpectedWebsocketRequestRejection)
  }
}

推荐答案

提示:此答案基于 akka-stream-experimental 版本 2.0-M2.API在其他版本中可能略有不同.

HINT: This answer is based on akka-stream-experimental version 2.0-M2. The API may be slightly different in other versions.

关闭连接的一种简单方法是使用 PushStage:

An easy way to close the connection is by using a PushStage:

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ⇒
      // println("Connection closed")
      ctx.finish()
    case msg ⇒
      ctx.push(msg)
  }
}

在客户端或服务器端接收的每个元素(以及通常通过 Flow 的每个元素)都通过这样的 Stage 组件.在 Akka 中,完整的抽象被称为 GraphStage,更多信息可以在 官方文档.

Every element that is received at the client side or at the server side (and in general every element that goes through a Flow) goes through such a Stage component. In Akka, the full abstraction is called GraphStage, more information can be found in the official documentation.

使用 PushStage,我们可以查看具体传入元素的值,然后相应地转换上下文.在上面的例子中,一旦收到 goodbye 消息,我们就完成上下文,否则我们只是通过 push 方法转发值.

With a PushStage we can watch concrete incoming elements for their value and than transform the context accordingly. In the example above, once the goodbye message is received we finish the context otherwise we just forward the value through the push method.

现在,我们可以通过 transform 方法将 closeClient 组件连接到任意流:

Now, we can connect the closeClient component to an arbitrary flow through the transform method:

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("
"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ⇒ closeClient)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_ + "
")
  .map(ByteString(_))

connection.join(flow).run()

上面的流程接收到一个ByteString并返回一个ByteString,这意味着它可以通过join<连接到connection/代码>方法.在流程内部,我们首先将字节转换为字符串,然后再将它们发送到 closeClient.如果 PushStage 没有完成流,元素将在流中转发,在那里它被丢弃并替换为来自 stdin 的一些输入,然后通过线路发回.如果流完成,则舞台组件之后的所有进一步流处理步骤都将被删除 - 流现在已关闭.

The flow above receives a ByteString and returns a ByteString, which means it can be connected to connection through the join method. Inside of the flow we first convert the bytes to a string before we send them to closeClient. If the PushStage doesn't finish the stream, the element is forwarded in the stream, where it gets dropped and replaced by some input from stdin, which is then sent back over the wire. In case the stream is finished, all further stream processing steps after the stage component will be dropped - the stream is now closed.

这篇关于从服务器关闭 akka-http websocket 连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆