在 Reactor Flux 中,onCancel 调用不会返回到调用方法 [英] In Reactor Flux an onCancel call is not returned to calling method

查看:60
本文介绍了在 Reactor Flux 中,onCancel 调用不会返回到调用方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组方法来处理 Flux 所以

I have a set of methods that handle a Flux<String> so

@GetMapping(value = "/web/{s}")
public Flux<String> getWeb(@PathVariable String s) {
    return myService.getFlux(s);
}

service 类中:

public Flux<String> getFlux(String json) throws Exception {
    return handleRequest(json);
}

private Flux<String> handleRequest(String json) throws Exception {
    Many<String> sink = Sinks.many().multicast().directBestEffort();

    ... do stuff with sink ... 

    /* Handle flux completion, termination, error */
    Flux<String> webFlux = sink.asFlux()
            .doOnTerminate(() -> { disposeSink(id); })
            .doOnComplete(() -> { completeSink(id); })
            .doAfterTerminate(() -> { terminateSink(id); }) 
            .doOnCancel(() -> { cancelSink(id); })

    return webFlux;
}

当我让客户端在请求完成之前缩短、取消、终止他们的请求时,doOnCancel 调用仅在它附加到初始调用方法时才有效,即

When I get the client to cut short, cancel, terminate their request before it completes, the doOnCancel call only works if it is attached to the initial calling method, i.e.

@GetMapping(value = "/web/{s}")
public Flux<String> getWeb(@PathVariable String s) {
    return myService.getFlux(s).doOnCancel(() -> Log.Info("Cancelled"));
}

这不是我真正想放的地方,尽管考虑到它是面向客户的呼叫,这是有道理的.为什么 service 类中的 doOnCancel 钩子没有返回给调用者?传递回调用方法的通量是一种发布/订阅自身吗?!!

This is not really where I want to put it, although it makes sense given it's the client facing call. Why doesn't the doOnCancel hook in the service class get back to the caller? Is a flux that's passed back to calling methods kind of publishing / subscribing to itself ?!!

推荐答案

这有点奇怪,但我不得不像这样直接返回带有钩子的 sink.asFlux() :

This was a bit weird but I had to return the sink.asFlux() with the hooks directly like this:

private Flux<String> handleRequest(String json) throws Exception {
    Many<String> sink = Sinks.many().multicast().directBestEffort();

    ... do stuff with sink ... 

    /* Handle flux completion, termination, error */
    return sink.asFlux()
        .doOnTerminate(() -> { disposeSink(id); })
        .doOnComplete(() -> { completeSink(id); })
        .doAfterTerminate(() -> { terminateSink(id); }) 
        .doOnCancel(() -> { cancelSink(id); });
}

然后在面向客户端的类上,当客户端随机取消请求时,一切正常.我也把 log() 放在那个类中

Then on the client facing class everything worked when the client randomly cancels the request. I also put the log() in that class

@GetMapping(value = "/web/{s}")
public Flux<String> getWeb(@PathVariable String s) {
    return myService.getFlux(s).log();
}

所以基本上没有直接的 Flux 初始化只是转换一次 sink.asFlux() 并且 return 它备份堆栈.

So basically there is no direct Flux initialization just transform a sink.asFlux() the one time and return it back up the stack.

这篇关于在 Reactor Flux 中,onCancel 调用不会返回到调用方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆