在 Reactor Flux 中,onCancel 调用不会返回到调用方法 [英] In Reactor Flux an onCancel call is not returned to calling method
问题描述
我有一组方法来处理 Flux
所以
I have a set of methods that handle a Flux<String>
so
@GetMapping(value = "/web/{s}")
public Flux<String> getWeb(@PathVariable String s) {
return myService.getFlux(s);
}
在 service
类中:
public Flux<String> getFlux(String json) throws Exception {
return handleRequest(json);
}
private Flux<String> handleRequest(String json) throws Exception {
Many<String> sink = Sinks.many().multicast().directBestEffort();
... do stuff with sink ...
/* Handle flux completion, termination, error */
Flux<String> webFlux = sink.asFlux()
.doOnTerminate(() -> { disposeSink(id); })
.doOnComplete(() -> { completeSink(id); })
.doAfterTerminate(() -> { terminateSink(id); })
.doOnCancel(() -> { cancelSink(id); })
return webFlux;
}
当我让客户端在请求完成之前缩短、取消、终止他们的请求时,doOnCancel
调用仅在它附加到初始调用方法时才有效,即
When I get the client to cut short, cancel, terminate their request before it completes, the doOnCancel
call only works if it is attached to the initial calling method, i.e.
@GetMapping(value = "/web/{s}")
public Flux<String> getWeb(@PathVariable String s) {
return myService.getFlux(s).doOnCancel(() -> Log.Info("Cancelled"));
}
这不是我真正想放的地方,尽管考虑到它是面向客户的呼叫,这是有道理的.为什么 service
类中的 doOnCancel
钩子没有返回给调用者?传递回调用方法的通量是一种发布/订阅自身吗?!!
This is not really where I want to put it, although it makes sense given it's the client facing call. Why doesn't the doOnCancel
hook in the service
class get back to the caller? Is a flux that's passed back to calling methods kind of publishing / subscribing to itself ?!!
推荐答案
这有点奇怪,但我不得不像这样直接返回带有钩子的 sink.asFlux()
:
This was a bit weird but I had to return the sink.asFlux()
with the hooks directly like this:
private Flux<String> handleRequest(String json) throws Exception {
Many<String> sink = Sinks.many().multicast().directBestEffort();
... do stuff with sink ...
/* Handle flux completion, termination, error */
return sink.asFlux()
.doOnTerminate(() -> { disposeSink(id); })
.doOnComplete(() -> { completeSink(id); })
.doAfterTerminate(() -> { terminateSink(id); })
.doOnCancel(() -> { cancelSink(id); });
}
然后在面向客户端的类上,当客户端随机取消请求时,一切正常.我也把 log()
放在那个类中
Then on the client facing class everything worked when the client randomly cancels the request. I also put the log()
in that class
@GetMapping(value = "/web/{s}")
public Flux<String> getWeb(@PathVariable String s) {
return myService.getFlux(s).log();
}
所以基本上没有直接的 Flux
初始化只是转换一次 sink.asFlux()
并且 return
它备份堆栈.
So basically there is no direct Flux
initialization just transform a sink.asFlux()
the one time and return
it back up the stack.
这篇关于在 Reactor Flux 中,onCancel 调用不会返回到调用方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!