如何正确地从 Spring WebFlux 中的多个 Fluxes (WebsocketSession::receive) 向 Sink 发出值? [英] How to correctly emit values to Sink from multiple Fluxes (WebsocketSession::receive) in Spring WebFlux?

查看:60
本文介绍了如何正确地从 Spring WebFlux 中的多个 Fluxes (WebsocketSession::receive) 向 Sink 发出值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的简化案例中,我想将 WebSocket 客户端发送的消息广播给所有其他客户端.该应用程序是使用响应式 websockets 和 Spring 构建的.

In my simplified case I want to broadcast a message sent by WebSocket client to all other clients. The application is built using reactive websockets with Spring.

我的想法是使用单Sink 并且如果从客户端收到消息,则在此接收器上发出它.WebsocketSession::send 只是将这个 Sink 发出的事件转发给连接的客户端.

My idea was to use single Sink and if a message is received from the client, emit it on this sink. WebsocketSession::send just forwards events emitted by this Sink to connected clients.

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })

        return Mono.zip(input, output).then()
    }

    fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)

    fun <T> fromJson(json : String, clazz : Class<T>) : T{
        return objectMapper.readValue(json, clazz)
    }

}

这个实现并不安全,因为 Sink.emitNext 可以从不同的线程调用.

This implementation is not safe as Sink.emitNext can be called from different threads.

我的尝试是使用 publishOn 并传递一个单线程 Scheduler 以便 onNext 用于所有 WebSocketSession从单个线程调用.然而这不起作用.从 websocket 客户端发出一项,然后所有后续的 websocket 客户端在连接后立即收到 onClose 事件:

My attempt was to use publishOn and pass a singled threaded Scheduler so that onNext for all WebSocketSessions is called from a single thread. However this does not work. One item is emitted from a websocket client and then all subsequent websocket clients receive onClose event immediately after connection :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val scheduler = Schedulers.newSingle("sink-scheduler")

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .publishOn(scheduler) // publish on single threaded scheduler
                .doOnNext {
                    sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                }
                .then()
        ...
    }

}

我能看到的另一个选项是在一些公共锁上同步,以便发射是线程安全的:

Another option which I could see is to synchronize on some common lock so that emission is thread safe :

@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
                               private val objectMapper : ObjectMapper) : WebSocketHandler {

    private val lock = Any()

    override fun handle(session: WebSocketSession): Mono<Void> {

        val input = session.receive()
                .doOnNext {
                    synchronized(lock) {
                        sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
                    }
                }
                .then()
        ...
    }


}

但是我不确定是否应该这样做.

However I am not sure if this should be done like that.

在这种情况下是否可以使用 publishOn 以便发射是线程安全的,如果不是,还有什么其他解决方案可以解决这个问题(除了使用同步,就像我对 synchronized 关键字).

Is it possible to use publishOn in this case so that emission is thread safe and if not what is other solution to this problem (apart of using synchronization like I have done with synchronized keyword).

推荐答案

代替使用 synchronized 选项的悲观锁定,您可以创建一个与 相当的 EmitFailureHandlerFAIL_FAST 除了它为 EmitResult.NON_SERIALIZED_ACCESS 返回 true.

Instead of pessimistic locking with the synchronized option, you could create an EmitFailureHandler comparable to FAIL_FAST except it returns true for EmitResult.NON_SERIALIZED_ACCESS.

这将导致立即重试并发发射尝试,就像在忙循环中一样.

This would result in the concurrent emit attempts to be immediately retried, like in a busy loop.

乐观地,这最终会成功.如果您想对无限循环进行额外防御,您甚至可以使自定义处理程序引入延迟或限制其返回 true 的次数.

Optimistically, this will end up succeeding. You can even make the custom handler introduce a delay or limit the number of times it returns true if you want to be extra defensive against infinite loops.

这篇关于如何正确地从 Spring WebFlux 中的多个 Fluxes (WebsocketSession::receive) 向 Sink 发出值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆