从服务器关闭akka-http websocket连接 [英] Close akka-http websocket connection from server

查看:159
本文介绍了从服务器关闭akka-http websocket连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的情况下,客户端发送再见 websocket消息,因此我需要在服务器端关闭以前建立的连接。

In my scenario, a client sends "goodbye" websocket message and I need to close previously established connection at the server side.

来自akka-http 文档

From akka-http docs:


通过取消服务器逻辑中的传入连接流(例如,通过将其下游连接到Sink.cancelled并将其上游连接到a),可以关闭连接Source.empty)。也可以通过取消IncomingConnection源连接来关闭服务器的套接字。

Closing connections is possible by cancelling the incoming connection Flow from your server logic (e.g. by connecting its downstream to a Sink.cancelled and its upstream to a Source.empty). It is also possible to shut down the server's socket by cancelling the IncomingConnection source connections.

但是我不清楚如何做到这一点。考虑到在协商新连接时将 Sink Source 设置一次:

But it's not clear to me how to do that taking into account that Sink and Source are set once when negotiating a new connection:

(get & path("ws")) {
  optionalHeaderValueByType[UpgradeToWebsocket]() {
    case Some(upgrade) ⇒
      val connectionId = UUID()
      complete(upgrade.handleMessagesWithSinkSource(sink, source))
    case None ⇒
      reject(ExpectedWebsocketRequestRejection)
  }
}


推荐答案

提示:此答案基于 akka-stream-experimental 版本 2.0-M2 。该API在其他版本中可能会略有不同。

HINT: This answer is based on akka-stream-experimental version 2.0-M2. The API may be slightly different in other versions.

一种简单的关闭连接的方法是使用 PushStage

An easy way to close the connection is by using a PushStage:

import akka.stream.stage._

val closeClient = new PushStage[String, String] {
  override def onPush(elem: String, ctx: Context[String]) = elem match {
    case "goodbye" ⇒
      // println("Connection closed")
      ctx.finish()
    case msg ⇒
      ctx.push(msg)
  }
}

在客户端或服务器端收到的每个元素(通常每个通过<$ c的元素$ c> Flow )经过这样的 Stage 组件。在Akka中,完整的抽象称为 GraphStage ,有关更多信息,请参见官方文档

Every element that is received at the client side or at the server side (and in general every element that goes through a Flow) goes through such a Stage component. In Akka, the full abstraction is called GraphStage, more information can be found in the official documentation.

PushStage 中,我们可以查看具体传入元素的值,然后相应地转换上下文。在上面的示例中,一旦收到再见消息,我们将完成上下文,否则我们将通过 push 转发值

With a PushStage we can watch concrete incoming elements for their value and than transform the context accordingly. In the example above, once the goodbye message is received we finish the context otherwise we just forward the value through the push method.

现在,我们可以通过 closeClient 组件连接到任意流转换方法:

Now, we can connect the closeClient component to an arbitrary flow through the transform method:

val connection = Tcp().outgoingConnection(address, port)

val flow = Flow[ByteString]
  .via(Framing.delimiter(
      ByteString("\n"),
      maximumFrameLength = 256,
      allowTruncation = true))
  .map(_.utf8String)
  .transform(() ⇒ closeClient)
  .map(_ ⇒ StdIn.readLine("> "))
  .map(_ + "\n")
  .map(ByteString(_))

connection.join(flow).run()

以上流程收到 ByteString 并返回 ByteString ,这意味着它可以通过 join 方法连接到 connection 。在流内部,我们首先将字节转换为字符串,然后再将其发送到 closeClient 。如果 PushStage 没有完成流,则将该元素转发到流中,在该流中将其删除并替换为stdin的某些输入,然后将其通过线。如果流完成,则将删除阶段组件之后的所有其他流处理步骤-流现在已关闭。

The flow above receives a ByteString and returns a ByteString, which means it can be connected to connection through the join method. Inside of the flow we first convert the bytes to a string before we send them to closeClient. If the PushStage doesn't finish the stream, the element is forwarded in the stream, where it gets dropped and replaced by some input from stdin, which is then sent back over the wire. In case the stream is finished, all further stream processing steps after the stage component will be dropped - the stream is now closed.

这篇关于从服务器关闭akka-http websocket连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆