Akka websocket - 如何通过服务器关闭连接? [英] Akka websocket - how to close connection by server?

查看:35
本文介绍了Akka websocket - 如何通过服务器关闭连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这里是我的 websocket 服务器实现.

So here is my websocket server implementation.

val route = get {
  pathEndOrSingleSlash {
    handleWebSocketMessages(websocketFlow)
  }
}

def websocketFlow: Flow[Message, Message, Any] =
  Flow[Message]
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
    .via(chatActorFlow(UUID.randomUUID()))
    .map(event => TextMessage.Strict(protocol.serialize(event)))


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {

  val sink = Flow[Protocol.Message]
    .map(msg => Protocol.SignedMessage(connectionId, msg))
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

  val source = Source
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

  Flow.fromSinkAndSource(sink, source)
}

我想知道一旦 chatRef 发送了 ConnectionClosed 类型的消息,是否有任何方法可以关闭连接?

I'm wondering if there is any way to close connection once message of type ConnectionClosed is sent by chatRef?

推荐答案

下面的解决方案允许通过终止由 Source.actorRef 阶段实现的 Actor 来断开来自服务器端的连接.这只需向它发送一个 PoisonPill 即可完成.

The solution below allows to drop connections from the server side by terminating the Actor materialized by the Source.actorRef stage. This is simply done by sending a PoisonPill to it.

现在,我仍然不清楚您希望如何在连接时识别被禁止"的客户端,所以这个例子 - 故意 - 非常简单:服务器在达到最大数量的客户端后断开任何连接连接.如果您想随时使用任何其他策略来踢出客户端,您仍然可以应用相同的逻辑并将 PoisonPill 发送给他们自己的源演员.

Now, it is still not clear to me how you'd like to identify a "banned" client at connection time, so the example is - on purpose - very simple: the server drops any connection after a maximum amount of clients are connected. If you want to use any other strategy to kick out clients at any time, you can still apply the same logic and send PoisonPill to their own source actors.

object ChatApp extends App {

  implicit val system = ActorSystem("chat")
  implicit val executor: ExecutionContextExecutor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val route = get {
    pathEndOrSingleSlash {
      handleWebSocketMessages(websocketFlow)
    }
  }

  val maximumClients = 1

  class ChatRef extends Actor {
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef])

    def withClients(clients: Map[UUID, ActorRef]): Receive = {
      case SignedMessage(uuid, msg) => clients.collect{
        case (id, ar) if id == uuid => ar ! msg
      }
      case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
      case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
      case CloseConnection(uuid) => context.become(withClients(clients - uuid))
    }
  }

  object Protocol {
    case class SignedMessage(uuid: UUID, msg: String)
    case class OpenConnection(actor: ActorRef, uuid: UUID)
    case class CloseConnection(uuid: UUID)
  }

  val chatRef = system.actorOf(Props[ChatRef])

  def websocketFlow: Flow[Message, Message, Any] =
    Flow[Message]
      .mapAsync(1) {
        case TextMessage.Strict(s) => Future.successful(s)
        case TextMessage.Streamed(s) => s.runFold("")(_ + _)
        case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
      }.via(chatActorFlow(UUID.randomUUID()))
      .map(TextMessage(_))

  def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {

    val sink = Flow[String]
      .map(msg => Protocol.SignedMessage(connectionId, msg))
      .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

    val source = Source.actorRef(16, OverflowStrategy.fail)
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

    Flow.fromSinkAndSource(sink, source)
  }

  Http().bindAndHandle(route, "0.0.0.0", 8080)
    .map(_ => println(s"Started server..."))

}

这篇关于Akka websocket - 如何通过服务器关闭连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆