如何在断开连接后清理 akka-http websocket 资源然后重试? [英] How to clean up akka-http websocket resources following disconnection and then retry?

查看:34
本文介绍了如何在断开连接后清理 akka-http websocket 资源然后重试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面的代码成功建立了一个websocket连接.

websockets 服务器(也是 akk-http)使用 Andrew 在此处建议的答案故意关闭连接.

下面的 SinkActor 收到一条 akka.actor.Status.Failure 类型的消息,所以我知道从服务器到客户端的消息流已经中断.>

我的问题是...我的客户端应该如何重新建立 websocket 连接?source.via(webSocketFlow).to(sink).run() 完成了吗?

清理资源和重试 websocket 连接的最佳实践是什么?

class ConnectionAdminActor 使用 ActorLogging { 扩展 Actor隐式 val 系统:ActorSystem = context.system隐式 val flowMaterializer = ActorMaterializer()private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {参考 =>{自己 !StartupWithActor(ref.path)参考}}私有 val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))来源.via(webSocketFlow).下沉).跑()

解决方案

尝试 recoverWithRetries 组合器 (docs 此处).

这允许您提供替代的 Source 您的管道将切换到,以防上游失败.在最简单的情况下,您可以重复使用相同的 Source,它应该会发出一个新的连接.

val wsSource = source via webSocketFlow来源.recoverWithRetries(attempts = -1, {case e: Throwable => wsSource}).下沉)

注意

  • attempts = -1 将无限重试重新连接
  • 部分函数允许更精细地控制哪些异常可以触发重新连接

The code below successfully establishes a websocket connection.

The websockets server (also akk-http) deliberately closes the connection using Andrew's suggested answer here.

The SinkActor below receives a message of type akka.actor.Status.Failure so I know that the flow of messages from Server to Client has been disrupted.

My question is ... How should my client reestablish the websocket connection? Has source.via(webSocketFlow).to(sink).run() completed?

What is best practice for cleaning up the resources and retrying the websocket connection?

class ConnectionAdminActor extends Actor with ActorLogging {
  implicit val system: ActorSystem = context.system
  implicit val flowMaterializer    = ActorMaterializer()

  private val sinkActor = context.system.actorOf(Props[SinkActor], name = "SinkActor")

  private val sink = Sink.actorRefWithAck[Message](sinkActor, StartupWithActor(self.path), Ack, Complete)

  private val source = Source.actorRef[TextMessage](10, OverflowStrategy.dropHead).mapMaterializedValue {
    ref => {
      self ! StartupWithActor(ref.path)
      ref
    }
  }

  private val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080"))

  source
    .via(webSocketFlow)
    .to(sink)
    .run()

解决方案

Try the recoverWithRetries combinator (docs here).

This allows you to provide an alternative Source your pipeline will switch to, in case the upstream has failed. In the most simple case, you can just re-use the same Source, which should issue a new connection.

val wsSource = source via webSocketFlow

wsSource
  .recoverWithRetries(attempts = -1, {case e: Throwable => wsSource})
  .to(sink)

Note that

  • the attempts = -1 will retry to reconnect indefinetely
  • the partial function allows for more granular control over which exception can trigger a reconnect

这篇关于如何在断开连接后清理 akka-http websocket 资源然后重试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆