如何正确地从 Spring WebFlux 中的多个 Fluxes (WebsocketSession::receive) 向 Sink 发出值? [英] How to correctly emit values to Sink from multiple Fluxes (WebsocketSession::receive) in Spring WebFlux?
问题描述
在我的简化案例中,我想将 WebSocket 客户端发送的消息广播给所有其他客户端.该应用程序是使用响应式 websockets 和 Spring 构建的.
In my simplified case I want to broadcast a message sent by WebSocket client to all other clients. The application is built using reactive websockets with Spring.
我的想法是使用单Sink
并且如果从客户端收到消息,则在此接收器上发出它.WebsocketSession::send
只是将这个 Sink
发出的事件转发给连接的客户端.
My idea was to use single
Sink
and if a message is received from the client, emit it on this sink. WebsocketSession::send
just forwards events emitted by this Sink
to connected clients.
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
val output = session.send(sink.asFlux().map { message -> session.textMessage(toJson(message)) })
return Mono.zip(input, output).then()
}
fun toJson(obj : Any) : String = objectMapper.writeValueAsString(obj)
fun <T> fromJson(json : String, clazz : Class<T>) : T{
return objectMapper.readValue(json, clazz)
}
}
这个实现并不安全,因为 Sink.emitNext
可以从不同的线程调用.
This implementation is not safe as Sink.emitNext
can be called from different threads.
我的尝试是使用 publishOn
并传递一个单线程 Scheduler
以便 onNext
用于所有 WebSocketSession
从单个线程调用.然而这不起作用.从 websocket 客户端发出一项,然后所有后续的 websocket 客户端在连接后立即收到 onClose 事件:
My attempt was to use publishOn
and pass a singled threaded Scheduler
so that onNext
for all WebSocketSession
s is called from a single thread. However
this does not work. One item is emitted from a websocket client and then all subsequent websocket clients receive onClose event immediately after connection :
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val scheduler = Schedulers.newSingle("sink-scheduler")
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.publishOn(scheduler) // publish on single threaded scheduler
.doOnNext {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
.then()
...
}
}
我能看到的另一个选项是在一些公共锁上同步
,以便发射是线程安全的:
Another option which I could see is to synchronize
on some common lock so that emission is thread safe :
@Component
class ReactiveWebSocketHandler(private val sink: Sinks.Many<Message>,
private val objectMapper : ObjectMapper) : WebSocketHandler {
private val lock = Any()
override fun handle(session: WebSocketSession): Mono<Void> {
val input = session.receive()
.doOnNext {
synchronized(lock) {
sink.emitNext(fromJson(it.payloadAsText, Message::class.java), Sinks.EmitFailureHandler.FAIL_FAST)
}
}
.then()
...
}
}
但是我不确定是否应该这样做.
However I am not sure if this should be done like that.
在这种情况下是否可以使用 publishOn
以便发射是线程安全的,如果不是,还有什么其他解决方案可以解决这个问题(除了使用同步,就像我对 synchronized
关键字).
Is it possible to use publishOn
in this case so that emission is thread safe and if not what is other solution to this problem (apart of using synchronization like I have done with synchronized
keyword).
推荐答案
代替使用 synchronized
选项的悲观锁定,您可以创建一个与 相当的
除了它为 EmitFailureHandler
FAIL_FASTEmitResult.NON_SERIALIZED_ACCESS
返回 true
.
Instead of pessimistic locking with the synchronized
option, you could create an EmitFailureHandler
comparable to FAIL_FAST
except it returns true
for EmitResult.NON_SERIALIZED_ACCESS
.
这将导致立即重试并发发射尝试,就像在忙循环中一样.
This would result in the concurrent emit attempts to be immediately retried, like in a busy loop.
乐观地,这最终会成功.如果您想对无限循环进行额外防御,您甚至可以使自定义处理程序引入延迟或限制其返回 true
的次数.
Optimistically, this will end up succeeding. You can even make the custom handler introduce a delay or limit the number of times it returns true
if you want to be extra defensive against infinite loops.
这篇关于如何正确地从 Spring WebFlux 中的多个 Fluxes (WebsocketSession::receive) 向 Sink 发出值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!