基于“资源"的BroadcastHub过滤可以在网络上进行.连接的客户端正在工作? [英] BroadcastHub filtering based on "resource" the connected client is working on?

查看:85
本文介绍了基于“资源"的BroadcastHub过滤可以在网络上进行.连接的客户端正在工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个纯Websocket Web应用程序,这意味着在Websocket升级之前,没有用户/客户端步骤,更具体地说: 身份验证请求会像其他通讯一样通过websockets

I am writing a pure websocket web application, meaning that prior to the websocket upgrade there is no user/client step, more specifically: Authentication request goes over websockets as does the rest of the communication

有/有:

  • /api/ws上只有一个websocket端点
  • 连接到该端点的多个客户端
  • 多个客户的多个项目

现在,不是每个客户端都可以访问每个项目-该访问控制是在服务器端(ofc)上实现的,与websocket本身无关.

Now, not each client has access to each project - the access control for that is implemented on the server side (ofc) and has nothing to do with websockets per se.

我的问题是,我希望允许协作,这意味着N个客户可以一起处理一个项目.

My problem is, that I want to allow collaboration, meaning that N clients can work on 1 project together.

现在,如果这些客户端之一修改了某些内容,我想通知正在处理该项目的所有其他客户端.

Now if one of these clients modifies something, I want to notify all the other clients that are working on THAT project.

这尤其重要,因为atm是我唯一从事此工作并对其进行测试的人,这是我这方面的主要监督,因为现在:

This is especially important, because atm I am the only one working on this and testing it and this is major oversight on my side, because right now:

如果客户端A连接到Project X,而客户端B连接到Proyjct,则如果其中任何一个更新了各自项目中的某些内容,则另一个将收到有关这些更改的通知.

if Client A connects to Project X and Client B connects to Proejct Y, if any of them update something in their respective project, the other one gets notified of those changes.

现在我的WebsocketController很简单,我基本上有这个:

Now my WebsocketController is rather simple, I basically have this:

private val fanIn = MergeHub.source[AllowedWSMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[AllowedWSMessage])(Keep.right).run()

def handle: WebSocket = WebSocket.accept[AllowedWSMessage, AllowedWSMessage]
{
  _ => Flow.fromSinkAndSource(fanIn, fanOut)
}

现在,根据我的理解,我要么需要

Now from my understanding what I would need is either

1)每个项目有多个websocket端点,例如/api/{project_identifier}/ws

1) Multiple websocket endpoints per project, like /api/{project_identifier}/ws

(X)OR

2)一些基于WebSocket连接/所连接的客户端正在工作的项目进行拆分的方法.

2) Some means of splitting the WebSocket connections/the connected clients based on the project they are working.

由于我不想走路线1),我会在2)上分享我的想法:

As I would prefer not to go route 1) I'll share my thoughts on 2):

我暂时看不到解决方法的问题是,我可以轻松地在服务器端创建一些集合,以便在任何给定时刻存储哪个用户正在处理哪个项目(例如,如果他们选择/切换)一个项目,客户端将其发送到服务器,并且该项目存储了此信息)

The problem I don't see a workaround for now, is that I may easily create some collection on the server side, where I store which user is working on which project at any given moment (like, if they choose/switch a project, the client sends that to the server and that one stores this information)

但是我仍然有那个fanOut,所以这不能解决我有关WebSocket/AkkaStreams的问题.

BUT I still have that one fanOut, so this would not solve my problem in regards to the WebSocket/AkkaStreams.

BroadcastHub上是否有需要执行的魔术(过滤)操作?

Is there some magic (filtering) to be invoked on BroadcastHub that does what I want?

尝试但未能应用@James Roper的良好提示后,现在在这里共享我的整个websocket逻辑:

edit: Sharing my whole websocket logic here now, after trying but failing to apply the good hints of @James Roper:

 class WebSocketController @Inject()(implicit cc: ControllerComponents, ec: ExecutionContext, system: ActorSystem, mat: Materializer) extends AbstractController(cc)

{ val logger:Logger = Logger(this.getClass())

{ val logger: Logger = Logger(this.getClass())

type WebSocketMessage = Array[Byte]

import scala.concurrent.duration._

val tickingSource: Source[WebSocketMessage, Cancellable] =
  Source.tick(initialDelay = 1 second, interval = 10 seconds, tick = NotUsed)
    .map(_ => Wrapper().withKeepAlive(KeepAlive()).toByteArray)

private val generalActor = system.actorOf(Props
{
  new myActor(system, "generalActor")
}, "generalActor")

private val serverMessageSource = Source
  .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
  .mapMaterializedValue
  { queue => generalActor ! InitTunnel(queue) }

private val sink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(generalActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
private val source: Source[WebSocketMessage, Cancellable] = tickingSource.merge(serverMessageSource)

private val fanIn = MergeHub.source[WebSocketMessage].to(sink).run()
private val fanOut = source.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()

// TODO switch to WebSocket.acceptOrResult
def handle: WebSocket = WebSocket.accept[WebSocketMessage, WebSocketMessage]
  {
    //_ => createFlow()
    _ => Flow.fromSinkAndSource(fanIn, fanOut)
  }

private val projectHubs = TrieMap.empty[String, (Sink[WebSocketMessage, NotUsed], Source[WebSocketMessage, NotUsed])]

private def buildProjectHub(projectName: String) =
{
  logger.info(s"building projectHub for $projectName")

  val projectActor = system.actorOf(Props
  {
    new myActor(system, s"${projectName}Actor")
  }, s"${projectName}Actor")

  val projectServerMessageSource = Source
    .queue[WebSocketMessage](10, OverflowStrategy.backpressure)
    .mapMaterializedValue
    { queue => projectActor ! InitTunnel(queue) }

  val projectSink: Sink[WebSocketMessage, NotUsed] = Sink.actorRefWithAck(projectActor, InternalMessages.Init(), InternalMessages.Acknowledged(), InternalMessages.Completed())
  val projectSource: Source[WebSocketMessage, Cancellable] = tickingSource.merge(projectServerMessageSource)

  val projectFanIn = MergeHub.source[WebSocketMessage].to(projectSink).run()
  val projectFanOut = projectSource.toMat(BroadcastHub.sink[WebSocketMessage])(Keep.right).run()

  (projectFanIn, projectFanOut)
}

private def getProjectHub(userName: String, projectName: String): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
  logger.info(s"trying to get projectHub for $projectName")

  val (sink, source) = projectHubs.getOrElseUpdate(projectName, {
    buildProjectHub(projectName)
  })

  Flow.fromSinkAndSourceCoupled(sink, source)
}

private def extractUserAndProject(msg: WebSocketMessage): (String, String) =
{
  Wrapper.parseFrom(msg).`type` match
  {
    case m: MessageType =>
      val message = m.value
      (message.userName, message.projectName)
    case _ => ("", "")
  }
}

private def createFlow(): Flow[WebSocketMessage, WebSocketMessage, NotUsed] =
{
  // broadcast source and sink for demux/muxing multiple chat rooms in this one flow
  // They'll be provided later when we materialize the flow
  var broadcastSource: Source[WebSocketMessage, NotUsed] = null
  var mergeSink: Sink[WebSocketMessage, NotUsed] = null

  Flow[WebSocketMessage].map
  {
    m: WebSocketMessage =>
    val msg = Wrapper.parseFrom(m)
    logger.warn(s"client sent project related message: ${msg.toString}");
    m
  }.map
    {
      case isProjectRelated if !extractUserAndProject(isProjectRelated)._2.isEmpty =>
        val (userName, projectName) = extractUserAndProject(isProjectRelated)

        logger.info(s"userName: $userName, projectName: $projectName")
        val projectFlow = getProjectHub(userName, projectName)

        broadcastSource.filter
        {
          msg =>
            val (_, project) = extractUserAndProject(msg)
            logger.info(s"$project == $projectName")
            (project == projectName)
        }
          .via(projectFlow)
          .runWith(mergeSink)

        isProjectRelated

      case other =>
      {
        logger.info("other")
        other
      }
    } via {
      Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[WebSocketMessage], MergeHub.source[WebSocketMessage])
      {
        (source, sink) =>
          broadcastSource = source
          mergeSink = sink

          source.filter(extractUserAndProject(_)._2.isEmpty)
            .map
            { x => logger.info("Non project related stuff"); x }
            .via(Flow.fromSinkAndSource(fanIn, fanOut))
            .runWith(sink)

          NotUsed
      }
    }
}

}

解决方案/想法我的理解:

Solution/Idea how I understood it:

1)我们有一个包装程序流",其中有一个broadcastSource和mergeSink为空,直到我们在外部} via {块中实现它们

1) We have a "wrapper flow" where we have a broadcastSource and mergeSink that are null until we materialize them in the outer } via { block

2)在包装程序流"中,我们映射每个元素以对其进行检查.

2) In that "wrapper flow" we map each element to inspect it.

I)如果与项目有关,我们

I) In case it's project related, we

a)为项目获取/创建自己的子流 b)根据项目名称过滤元素 c)让通过过滤器的对象由sub/project-flow占用,以便与项目连接的每个人都可以获取该元素

a) get/create an own subflow for the project b) filter the elements based on the project name c) let those that pass the filter be consumed by the sub/project-flow so that everyone that is connected to the project gets that element

II)如果与项目无关,我们只需将其传递

II) In case it's not project related, we just pass it on

3)我们的包装器流程是按照按需"实现的流程进行的,在实现的via中,我们将与项目无关的元素分发给所有连接的Web套接字客户端.

3) Our wrapper flow is going by a "on demand" materialized flow, and in the via where it is materialized we let the elements that are not project related be distributed to all connected web socket clients.

总结:我们有一个用于websocket连接的包装器流程",可以通过projectFlow或generalFlow进行连接,具体取决于它正在处理的消息/元素.

To summarize: We have a "wrapper flow" for the websocket connection that either goes via a projectFlow or a generalFlow, depending on the message/element it is working in.

我现在的问题是(似乎很琐碎,但我还是在某种程度上苦苦挣扎)每个消息都应该进入myActor(atm),并且也应该有消息从那里发出(请参阅serverMesssageSourcesource)

My problem now is (and it seems to be trivial, yet I am struggling somehow) that EVERY message should go into the myActor (atm) and there should be messages coming out from there as well (see serverMesssageSource and source)

但是上面的代码正在创建不确定的结果,例如一个客户端发送2条消息,但有4条正在处理(根据服务器发送回的日志和结果),有时消息从控制器到角色的途中突然丢失.

But the above code is creating non-deterministic results, e.g. one client sends 2 messages, but there are 4 being handled (according to logs and results the server sends back), sometimes messages are suddenly lost on their way from the controller to the actor.

我无法解释这一点,但是如果我只把它留给_ => Flow.fromSinkAndSource(fanIn, fanOut),每个人都可以得到一切,但是至少如果只有一个客户,它就可以完成预期的工作(显然:))

I can't explain that, but if I leave it just with _ => Flow.fromSinkAndSource(fanIn, fanOut) everyone gets everything, but at least if there is only one client it does exactly what is expected (obviously :))

推荐答案

我实际上建议使用Play的 socket.io支持.这提供了名称空间,从您的描述中可以看出,这些名称空间可以直接实现所需的内容-每个名称空间都是其自己独立管理的流程,但是所有名称空间都位于同一WebSocket之下.我写了一篇博客文章,说明您为什么选择使用今天的socket.io.

I would actually recommend using Play's socket.io support. This offers namespaces, which from what I can tell from your description make it straight forward to implement exactly what you want - each namespace is its own independently managed flow, but all namespaces go down the same WebSocket. I wrote a blog post about why you might choose to use socket.io today.

如果您不想使用socket.io,我在这里有一个示例(此示例使用socket.io,但不使用socket.io命名空间,因此可以很容易地改编为在直接的WebSockets上运行),该示例一个多聊天室协议-将消息馈送到BroadcastHub,然后为用户当前所属的每个聊天室对中心进行一个订阅(对您来说,每个项目都是一个订阅).这些订阅中的每个订阅都会过滤来自中心的消息,以仅包括该订阅聊天室的消息,然后将消息馈送到该聊天室MergeHub.

If you don't want to use socket.io, I have an example here (this uses socket.io, but doesn't use socket.io namespaces, so could easily be adapted to run on straight WebSockets) which shows a multi chat room protocol - it feeds messages into a BroadcastHub, and then there is one subscription to the hub for each chat room that the user is currently a part of (for you, it would be one subscription for each project). Each of those subscriptions filter the messages from the hub to include only the messages for that subscriptions chat room, and then feed the messages into that chatrooms MergeHub.

这里突出显示的代码根本不是特定于socket.io的,如果您可以将WebSocket连接调整为ChatEvent的流程,则可以按原样使用它:

The highlighted code here is not specific to socket.io at all, if you can adapt the WebSocket connection to be a flow of ChatEvent, you can use this as is:

https://github.com/playframework/play-socket.io/blob/c113e74a4d9b435814df1ccdc885029c397d9179/samples/scala/multi-room-chat/app/chat/ChatEngine.scala#L84-L125

要满足您通过每个人都连接到的广播频道定向非项目特定消息的要求,请首先创建该频道:

To address your requirement to direct non project specific messages through a broadcast channel that everyone connects to, first, create that channel:

val generalFlow = {
  val (sink, source) = MergeHub.source[NonProjectSpecificEvent]
    .toMat(BroadcastHub.sink[NonProjectSpecificEvent])(Keep.both).run
  Flow.fromSinkAndSourceCoupled(sink, source)
}

然后,当每个连接的WebSocket的广播接收器/源连接时,将其附加(这来自聊天示例:

Then, when the broadcast sink/source for each connected WebSocket connects, attach it (this is from the chat example:

} via {
  Flow.fromSinkAndSourceCoupledMat(BroadcastHub.sink[YourEvent], MergeHub.source[YourEvent]) { (source, sink) =>
    broadcastSource = source
    mergeSink = sink

    source.filter(_.isInstanceOf[NonProjectSpecificEvent])
      .via(generalFlow)
      .runWith(sink)

    NotUsed
  }
}

这篇关于基于“资源"的BroadcastHub过滤可以在网络上进行.连接的客户端正在工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆