如何在响应流中将消息从订阅者发送到 Web 套接字连接中的发布者 [英] How to send a message in a reactive stream from a subscriber to a publisher in a web socket connection

查看:21
本文介绍了如何在响应流中将消息从订阅者发送到 Web 套接字连接中的发布者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的应用程序有一个 Akka-Websocket 接口.Web 套接字由参与者订阅者和参与者发布者组成.订阅者通过将命令发送给相应的参与者来处理命令.发布者侦听事件流并将更新信息发布回流(并最终发布到客户端).这很好用.

我的问题:订阅者如何将事件发送回流?例如确认接收到的命令的执行.

public class WebSocketApp extends HttpApp {private static final Gson gson = new Gson();@覆盖公共路线 createRoute() {返回获取(路径(度量").路由(handleWebSocketMessages(度量())));}私人流<消息,消息,?>指标(){Sink<消息,ActorRef>metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());源<消息,ActorRef>指标来源 =Source.actorPublisher(WebSocketDataPublisherActor.props()).map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));返回 Flow.fromSinkAndSource(metricsSink, metricsSource);}}

一个不错的解决方案可能是,订阅参与者(上面代码中的 WebSocketCommandSubscriber 参与者)可以像 sender().tell(...)...

解决方案

不,这是不可能的,至少不是直接的.流始终是单向的——所有消息都向一个方向流动,而对它们的需求则向相反的方向流动.您需要将确认消息从接收器传递到源,以便后者将其发送回客户端,例如,通过在接收器参与者中注册源参与者.它可能看起来像这样:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {sinkActor.tell(new RegisterSource(sourceActor), null);})

然后,在您的接收器 Actor 收到 RegisterSource 消息后,它可以将确认消息发送到提供的 ActorRef,然后将它们转发到输出流.>

My application has an Akka-Websocket interface. The web socket consists of an actor-subscriber and an actor publisher. The subscriber handles commands, by sending them to the corresponding actor. The publisher listens on the event stream and publishes update informations back to the stream (and so finally to the client). This works well.

My question: How is it possible for the Subscriber to send an event back to the stream? For example to confirm the execution of a received command.

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

A nice solution could be, that the subscribing actor (the WebSocketCommandSubscriber actor in the code above) could send a message back to the stream like sender().tell(...)...

解决方案

No, it is not possible, not directly, at least. Streams are always unidirectional - all messages flow in one direction, while demand for them flows in the opposite direction. You need to pass your confirmation messages from the sink to the source for the latter to emit it back to the client, for example, by registering the source actor in the sink actor. It could look like this:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
    sinkActor.tell(new RegisterSource(sourceActor), null);
})

Then, after your sink actor receives RegisterSource message, it can send the confirmation messages to the provided ActorRef, which will then forward them to the output stream.

这篇关于如何在响应流中将消息从订阅者发送到 Web 套接字连接中的发布者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆