Spring WebFlux 反应式 WebSocket 防止连接关闭 [英] Spring WebFlux reactive WebSocket prevent connection closing

查看:105
本文介绍了Spring WebFlux 反应式 WebSocket 防止连接关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为我的应用程序开发简单的聊天模块,后端使用 Spring WebFlux 和 ReactiveMongoRepository,前端使用 Angular 4.我能够通过 WebSocketSession 接收数据,但是在从 db 流式传输所有消息后,我想保持连接,以便我可以更新消息列表.任何人都可以给我提供如何实现这一目标的线索,或者我可能遵循了错误的假设?

I'm working on simple chat module for my application using Spring WebFlux with ReactiveMongoRepository on backend and Angular 4 on front. I'm able to receive data through WebSocketSession but after streaming all messages from db i want to keep the connection so i could update message list. Can anyone give me clues how to achieve that, or maybe i'm following wrong assumptions ?

Java 后端负责 WebSocket,我的订阅者只记录当前状态,没有相关内容:

Java Backend responsible for WebSocket, my subscriber only logs current state, nothing relevant there:

WebFlux 配置:

WebFluxConfiguration:

@Configuration
@EnableWebFlux
public class WebSocketConfig {

private final WebSocketHandler webSocketHandler;

@Autowired
public WebSocketConfig(WebSocketHandler webSocketHandler) {
    this.webSocketHandler = webSocketHandler;
}

@Bean
@Primary
public HandlerMapping webSocketMapping() {
    Map<String, Object> map = new HashMap<>();
    map.put("/websocket-messages", webSocketHandler);

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(10);
    mapping.setUrlMap(map);
    return mapping;
}

@Bean
public WebSocketHandlerAdapter handlerAdapter() {
    return new WebSocketHandlerAdapter();
}


}

WebSocketHandler 实现

WebSocketHandler Implementation

@Component
public class MessageWebSocketHandler implements WebSocketHandler {

private final MessageRepository messageRepository;
private ObjectMapper mapper = new ObjectMapper();
private MessageSubscriber subscriber = new MessageSubscriber();

@Autowired
public MessageWebSocketHandler(MessageRepository messageRepository) {
    this.messageRepository = messageRepository;
}

@Override
    public Mono<Void> handle(WebSocketSession session) {
    session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(this::toMessage)
            .subscribe(subscriber::onNext, subscriber:: onError, subscriber::onComplete);
    return session.send(
            messageRepository.findAll()
                    .map(this::toJSON)
                    .map(session::textMessage));
}

private String toJSON(Message message) {
    try {
        return mapper.writeValueAsString(message);
    } catch (JsonProcessingException e) {
        throw new RuntimeException(e);
    }
}

private Message toMessage(String json) {
    try {
        return mapper.readValue(json, Message.class);
    } catch (IOException e) {
        throw new RuntimeException("Invalid JSON:" + json, e);
    }
}
}

和 MongoRepo

and MongoRepo

@Repository
public interface MessageRepository extends 
ReactiveMongoRepository<Message,String> {
}

前端处理:

@Injectable()
export class WebSocketService {
  private subject: Rx.Subject<MessageEvent>;

  constructor() {
  }

  public connect(url): Rx.Subject<MessageEvent> {
    if (!this.subject) {
      this.subject = this.create(url);
      console.log('Successfully connected: ' + url);
    }
    return this.subject;
  }

  private create(url): Rx.Subject<MessageEvent> {
    const ws = new WebSocket(url);
    const observable = Rx.Observable.create(
      (obs: Rx.Observer<MessageEvent>) => {
        ws.onmessage = obs.next.bind(obs);
        ws.onerror = obs.error.bind(obs);
        ws.onclose = obs.complete.bind(obs);
        return ws.close.bind(ws);
      });
    const observer = {
      next: (data: Object) => {
        if (ws.readyState === WebSocket.OPEN) {
          ws.send(JSON.stringify(data));
        }
      }
    };
    return Rx.Subject.create(observer, observable);
  }
}

在其他服务中,我从响应到我的类型映射可观察

in other service i'm mapping observable from response to my type

  constructor(private wsService: WebSocketService) {
    this.messages = <Subject<MessageEntity>>this.wsService
      .connect('ws://localhost:8081/websocket-messages')
      .map((response: MessageEvent): MessageEntity => {
        const data = JSON.parse(response.data);
        return new MessageEntity(data.id, data.user_id, data.username, data.message, data.links);
      });
  }

最后订阅了由于连接关闭而无法使用的发送功能:

and finally subscribtion with send function which i can't use because of closed connection:

  ngOnInit() {
    this.messages = [];
    this._ws_subscription = this.chatService.messages.subscribe(
      (message: MessageEntity) => {
        this.messages.push(message);
      },
      error2 => {
        console.log(error2.json());
      },
      () => {
        console.log('Closed');
      }
    );
  }

  sendTestMessage() {
    this.chatService.messages.next(new MessageEntity(null, '59ca30ac87e77d0f38237739', 'mickl', 'test message angular', null));
  }

推荐答案

假设您的聊天消息在收到时被持久化到数据存储,您可以使用 Spring Data MongoDB Reactive 中的 tailable cursors 功能(请参阅 参考文档).

Assuming your chat messages are being persisted to the datastore as they're being received, you could use the tailable cursors feature in Spring Data MongoDB Reactive (see reference documentation).

因此您可以在您的存储库中创建一个新方法,例如:

So you could create a new method on your repository like:

public interface MessageRepository extends ReactiveSortingRepository< Message, String> {

    @Tailable
    Flux<Message> findWithTailableCursor();
}

请注意,可尾游标有一些限制:您的 mongo 集合需要被封顶,并且条目按插入顺序流式传输.

Note that tailable cursors have some limitations: you mongo collection needs to be capped and entries are streamed in their order of insertion.

Spring WebFlux websocket 支持尚不支持 STOMP 或消息代理,但对于此类用例,这可能是更好的选择.

Spring WebFlux websocket support does not yet support STOMP nor message brokers, but this might be a better choice for such a use case.

这篇关于Spring WebFlux 反应式 WebSocket 防止连接关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆