使用 Play 2.6 和 akka 流的 Websocket 代理 [英] Websocket Proxy using Play 2.6 and akka streams

查看:17
本文介绍了使用 Play 2.6 和 akka 流的 Websocket 代理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Play 和 akka 流为 Websocket 连接创建一个简单的代理.流量是这样的:

I'm trying to create a simple Proxy for Websocket connections using Play and akka streams. The traffic flow is like this:

(Client) request  ->         -> request (Server)
                      Proxy 
(Client) response <-         <- response (Server)

在遵循一些示例之后,我想出了以下代码:

I came up with the following code after following some examples:

def socket = WebSocket.accept[String, String] { request =>

val uuid = UUID.randomUUID().toString

// wsOut - actor that deals with incoming websocket frame from the Client
// wsIn - publisher of the frame for the Server
val (wsOut: ActorRef, wsIn: Publisher[String]) = {
  val source: Source[String, ActorRef] = Source.actorRef[String](10, OverflowStrategy.dropTail)
  val sink: Sink[String, Publisher[String]] = Sink.asPublisher(fanout = false)
  source.toMat(sink)(Keep.both).run()
}

// sink that deals with the incoming messages from the Server
val serverIncoming: Sink[Message, Future[Done]] =
  Sink.foreach[Message] {
    case message: TextMessage.Strict =>
      println("The server has sent: " + message.text)
  }

// source for sending a message over the WebSocket
val serverOutgoing = Source.fromPublisher(wsIn).map(TextMessage(_))

// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://0.0.0.0:6000"))

// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
serverOutgoing
  .viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
  .toMat(serverIncoming)(Keep.both) // also keep the Future[Done]
  .run()

// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    Future.successful(Done)
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}

// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))

val actor = system.actorOf(WebSocketProxyActor.props(wsOut, uuid))
val finalFlow = {
  val sink = Sink.actorRef(actor, akka.actor.Status.Success(()))
  val source = Source.maybe[String] // what the client receives. How to connect with the serverIncoming sink ???
  Flow.fromSinkAndSource(sink, source)
}

finalFlow

使用此代码,流量从客户端到代理再到服务器,再返回到代理,仅此而已.它不会进一步到达客户端.我怎样才能解决这个问题 ?我想我需要以某种方式将 serverIncoming 接收器连接到 finalFlow 中的 source,但我不知道该怎么做...

With this code, the traffic goes from the Client to the Proxy to the Server, back to the Proxy and that's it. It doesn't reach further to the Client. How can I fix this ? I think I need to somehow connect the serverIncoming sink to the source in the finalFlow, but I can't figure out how to do it...

或者我对这种方法完全错误?使用 Bidiflow 还是 Graph 更好?我是 akka 流的新手,仍在努力解决问题.

Or am I totally wrong with this approach ? Is it better to use a Bidiflow or a Graph ? I'm new to akka streams and still trying to figure things out.

推荐答案

以下似乎有效.注意:我已经在同一个控制器中实现了服务器套接字和代理套接字,但你可以拆分它们或在不同的实例上部署相同的控制器.在这两种情况下,都需要更新upper"服务的 ws url.

The following seems to work. Note: I've implemented both the server socket and the proxy socket in the same controller, but you can split them or deploy the same controller on separate instances. The ws url to the 'upper' service will need to be updated in both cases.

package controllers

import javax.inject._

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
import play.api.libs.streams.ActorFlow
import play.api.mvc._

import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps

@Singleton
class SomeController @Inject()(implicit exec: ExecutionContext,
                                actorSystem: ActorSystem,
                                materializer: Materializer) extends Controller {

  /*--- proxy ---*/
  def websocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/upper-socket"))

  def proxySocket: WebSocket = WebSocket.accept[String, String] { _ =>
    Flow[String].map(s => TextMessage(s))
      .via(websocketFlow)
      .map(_.asTextMessage.getStrictText)
  }

  /*--- server ---*/
  class UpperService(socket: ActorRef) extends Actor {
    override def receive: Receive = {
      case s: String => socket ! s.toUpperCase()
      case _ =>
    }
  }

  object UpperService {
    def props(socket: ActorRef): Props = Props(new UpperService(socket))
  }

  def upperSocket: WebSocket = WebSocket.accept[String, String] { _ =>
    ActorFlow.actorRef(out => UpperService.props(out))
  }   
}

您需要像这样设置路由:

You will need the routes to be set up like this:

GET /upper-socket controllers.SomeController.upperSocket
GET /proxy-socket controllers.SomeController.proxySocket

您可以通过向 ws://localhost:9000/proxy-socket 发送字符串来进行测试.答案将是大写的字符串.

You can test by sending a string to ws://localhost:9000/proxy-socket. The answer will be the uppercased string.

不过 1 分钟不活动后会超时:

There will be a timeout after 1 minute of inactivity though:

akka.stream.scaladsl.TcpIdleTimeoutException: TCP idle-timeout encountered on connection to [localhost:9000], no bytes passed in the last 1 minute

但请参阅:http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html 关于如何配置.

But see: http://doc.akka.io/docs/akka-http/current/scala/http/common/timeouts.html on how to configure this.

这篇关于使用 Play 2.6 和 akka 流的 Websocket 代理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆