如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息 [英] How to send a message in a reactive stream from a subscriber to a publisher in a web socket connection

查看:61
本文介绍了如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的应用程序具有Akka-Websocket界面。 Web套接字由一个演员订阅者和一个演员发布者组成。订户通过将命令发送给相应的参与者来处理命令。发布者侦听事件流,然后将更新信息发布回事件流(并最终发布给客户端)。

My application has an Akka-Websocket interface. The web socket consists of an actor-subscriber and an actor publisher. The subscriber handles commands, by sending them to the corresponding actor. The publisher listens on the event stream and publishes update informations back to the stream (and so finally to the client). This works well.

我的问题:订户如何将事件发送回流?例如,确认已执行的命令的执行。

My question: How is it possible for the Subscriber to send an event back to the stream? For example to confirm the execution of a received command.

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

一个不错的解决方案是,订阅方(上面代码中的 WebSocketCommandSubscriber actor可以将消息发送回流,如 sender()。tell(...) ...

A nice solution could be, that the subscribing actor (the WebSocketCommandSubscriber actor in the code above) could send a message back to the stream like sender().tell(...)...

推荐答案

不,至少不可能,不是直接。流始终是单向的-所有消息都沿一个方向流动,而对它们的需求则沿相反方向流动。您需要将确认消息从接收器传递到源,以便后者将其发送回客户端,例如,通过在接收器actor中注册源actor。看起来可能像这样:

No, it is not possible, not directly, at least. Streams are always unidirectional - all messages flow in one direction, while demand for them flows in the opposite direction. You need to pass your confirmation messages from the sink to the source for the latter to emit it back to the client, for example, by registering the source actor in the sink actor. It could look like this:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
    sinkActor.tell(new RegisterSource(sourceActor), null);
})

然后,接收器actor收到 RegisterSource 消息后,可以将确认消息发送到提供的 ActorRef ,然后转发它们到输出流。

Then, after your sink actor receives RegisterSource message, it can send the confirmation messages to the provided ActorRef, which will then forward them to the output stream.

这篇关于如何在Web套接字连接中以无反应流的形式从订阅者向发布者发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆