在连接源和接收器之前,等待客户端websocket流连接 [英] Waiting for a client websocket flow to connect before connecting source and sink

查看:193
本文介绍了在连接源和接收器之前,等待客户端websocket流连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用akka-streams设置客户端网络套接字。我正在尝试使用以下签名的方法封装安装程序:

I'm using akka-streams to set up a client web socket. I'm trying to encapsulate the setup in a method with the following signature:

def createConnectedWebSocket(url: String): Flow[Message, Message, _]

很明显如何创建Web套接字流,但未连接还没有:

It is clear how to create the web socket flow but it is not connected yet:

val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest(url))

我首先要等待升级响应将来,然后返回套接字流。但是,为了获得未来,我必须实现流程,为此必须连接 Source Sink 。但这应该由其他适配器类负责,例如,一个适配器类对JSON对象进行序列化和反序列化,并公开一个 Flow [JsValue,JsValue,_] 。它不必担心连接丢失时可能进行连接,甚至可能重新连接(一旦我设法编写该行为,它将成为我方法的更详细版本的一部分)。它应该只需要处理简单的 Flow

I first want to Await the upgrade response future and then return the socket flow. However, in order to get the future, I have to materialize the flow and for that I have to connect a Source and a Sink. But this should be the responsibility of some other adapter class, for instance one that serializes and deserializes json objects and exposes a Flow[JsValue, JsValue, _]. It should not have to worry about connecting and maybe reconnecting when the connection is lost (this behaviour will be part of a more elaborate version of my method once I manage to write it). It should only have to deal with a simple Flow.

我设法通过使用实现了我想要的一部分集线器:

I managed to achieve part of what I want by using hubs:

val mergeHubSource = MergeHub.source[Message](perProducerBufferSize = 16)
val broadcastHubSink = BroadcastHub.sink[Message](bufferSize = 16)

val ((messageSink, upgradeResponse), messageSource) =
  mergeHubSource
    .viaMat(webSocketFlow)(Keep.both)
    .toMat(broadcastHubSink)(Keep.both)
    .run()

所以现在我有一个 Source 和一个 Sink 我可以组合成一个 Flow 并将其返回。问题是,我对集线器功能不感兴趣。当我将 Source 连接到生成的 Flow 并将其关闭时,应将其传播到套接字,即插座应关闭。使用 MergeHub 时,它保持打开状态以便能够接受新资源。

So now I have a Source and a Sink that I can combine to a Flow and return it. The problem is, that I am not interested in the hub functionality. When I connect a Source to the resulting Flow and close it, this should be propagated to the socket, i.e., the socket should close. When using a MergeHub it remains open in order to be able to accept new sources.

这可能吗?我想我可以弥合自定义参与者之间的鸿沟,但是感觉就像我在这里重新发明一些可能已经以另一种形式实现的东西。

Is this possible? I think I could bridge the gap with custom actors but it feels like I'm reinventing something here that is likely already implemented in another form.

推荐答案

我找到了使用 SourceRef SinkRef 。尽管它们是用来弥合两台机器之间的差距的,但它们也可以在这里使用。

I found a solution using SourceRef and SinkRef. Although they are meant to be used to bridge the gap between two machines, they can be used here as well.

val webSocketFlow: Flow[Message, Message, Future[WebSocketUpgradeResponse]] =
    Http().webSocketClientFlow(WebSocketRequest(someUrl))

val (sinkRefFuture, sourceRefFuture) =
  StreamRefs.sinkRef[In]()
    .viaMat(f)(Keep.left)
    .toMat(StreamRefs.sourceRef[Out]())(Keep.both)
    .run()

val flow = Flow.fromSinkAndSource(await(sinkRefFuture), await(sourceRefFuture))

例如,这样定义了 await()

def await[T, F <: T](f: Future[F]): T = Await.result(f, 3.seconds)

$话虽这么说,但我认为,至少在我看来,最好不要预先实现套接字,这实际上是更好的选择。这样,无论使用它的人,都可以重新连接。我现在正在绕过一个流工厂,该工厂可以根据需要创建Web套接字 Flow (可能只实现一次)的新实例。

That being said, I figured that it is actually better, at least in my case, not to materialize the socket in advance. This way whoever uses it can also take care of reconnecting. I'm now passing around a flow factory that creates new instances of the web socket Flow (may only me materialized once) on demand.

这篇关于在连接源和接收器之前,等待客户端websocket流连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆