在 Reactor Sinks.Many() 中等效于 EmitterProcessor onCancel [英] In Reactor Sinks.Many() what is equivalent to EmitterProcessor onCancel

查看:347
本文介绍了在 Reactor Sinks.Many() 中等效于 EmitterProcessor onCancel的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Per 这里我曾经有代码

EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

sink.onCancel(() -> {
  cancelSink(id, request);
});

例如,当使用 rSocket 时,浏览器打开会话并请求一些数据,当客户端关闭浏览器时调用 EmitterProcessor publisher 喜欢

and when for example with rSocket a browser opened a session and asked for some data, calling the EmitterProcessor when a client shut down their browser the publisher like

Flux<String> out = Flux
    .from(emitter
    .log(log.getName())); 

会知道 Flux 订阅者被取消(当浏览器关闭时)并且会调用 onCancel 句柄.

would know that the Flux subscriber was cancelled (when a browser was closed) and that would call the onCancel handle.

使用 Sinks.Many() 我已经实现了

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});

Flux<String> out = Flux
    .from(sink.asFlux()
    .log(log.getName()));

并且字符串通过通量发布到浏览器,但是当客户端关闭会话时,不再有 onCancel 来处理一些整理工作.

and the strings are published via a flux to the browser, but when the client closes the session there is no longer the onCancel to handle some tidying up.

看起来这是这里讨论过这里 但我不明白解决方案.请问是什么?

It looks like this was discussed here and also here but I don't understand the solutions. What is it please?

推荐答案

sink.asFlux().doOnCancel(...)sink.asFlux() 是两种不同的情况.您没有重用已设置取消处理逻辑的逻辑,这就是为什么您没有观察 out 变量上的 cancelSink 清理.

sink.asFlux().doOnCancel(...) and sink.asFlux() are two different instances. You're not reusing the one where you have set up cancel handling logic, and that is why you don't observe the cancelSink cleanup on your out variable.

做一些类似的事情:

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

Flux<String> fluxWithCancelSupport = sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});

Flux<String> out = fluxWithCancelSupport
    .log(log.getName()));

(PS:你不需要 Flux.from(sink.asFlux()) 因为后者已经给了你一个 Flux).

(PS: you don't need the Flux.from(sink.asFlux()) since the later already gives you a Flux).

这篇关于在 Reactor Sinks.Many() 中等效于 EmitterProcessor onCancel的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆