断开连接后如何清理akka-http websocket资源,然后重试? [英] How to clean up akka-http websocket resources following disconnection and then retry?

查看:273
本文介绍了断开连接后如何清理akka-http websocket资源,然后重试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码成功建立了一个Websocket连接。

The code below successfully establishes a websocket connection.

websockets服务器(也称为akk-http)故意使用安德鲁的建议答案在这里

The websockets server (also akk-http) deliberately closes the connection using Andrew's suggested answer here.

SinkActor 下面收到类型为 akka.actor.Status.Failure 的消息,因此我知道从服务器到客户端的消息流已中断。

The SinkActor below receives a message of type akka.actor.Status.Failure so I know that the flow of messages from Server to Client has been disrupted.

我的问题是...我的客户应如何重新建立websocket连接? source.via(webSocketFlow).to(sink).run()是否已完成?

My question is ... How should my client reestablish the websocket connection? Has source.via(webSocketFlow).to(sink).run() completed?

清理资源和重试Websocket连接的最佳实践是什么?

What is best practice for cleaning up the resources and retrying the websocket connection?

class ConnectionAdminActor extends Actor with ActorLogging {
  implicit val system: ActorSystem = context.system
  implicit val flowMaterializer    = ActorMaterializer()

  private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")

  private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)

  private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
    ref => {
      self ! StartupWithActor(ref.path)
      ref
    }
  }

  private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))

  source
    .via(webSocketFlow)
    .to(sink)
    .run()


推荐答案

尝试 recoveryWithRetries 组合器(docs 在这里)。

Try the recoverWithRetries combinator (docs here).

这可让您提供管道将切换到的替代 Source ,以防上游发生故障。在最简单的情况下,您可以重复使用相同的 Source ,它应该发出新的连接。

This allows you to provide an alternative Source your pipeline will switch to, in case the upstream has failed. In the most simple case, you can just re-use the same Source, which should issue a new connection.

val wsSource = source via webSocketFlow

wsSource
  .recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
  .to(sink)

请注意


  • 尝试= -1 将尝试无限期地重新连接

  • 部分功能可提供更多功能精细控制哪些异常可以触发重新连接

  • the attempts = -1 will retry to reconnect indefinetely
  • the partial function allows for more granular control over which exception can trigger a reconnect

这篇关于断开连接后如何清理akka-http websocket资源,然后重试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆