Spring Boot RSocketRequester处理服务器重启 [英] Spring Boot RSocketRequester deal with server restart

查看:605
本文介绍了Spring Boot RSocketRequester处理服务器重启的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Springs RSocketRequester有疑问.我有一个rsocket服务器和客户端.客户端连接到该服务器并请求@MessageMapping端点.它按预期工作.

I have a question about Springs RSocketRequester. I have a rsocket server and client. Client connects to this server and requests @MessageMapping endpoint. It works as expected.

但是,如果我重新启动服务器该怎么办.如何从客户端自动重新连接到rsocket服务器?谢谢

But what if I restart the server. How to do automatic reconnect to rsocket server from client? Thanks

服务器:

@Controller
class RSC {

    @MessageMapping("pong")
    public Mono<String> pong(String m) {
        return Mono.just("PONG " + m);
    }
}

客户:

@Bean
    public RSocketRequester rSocketRequester() {
        return RSocketRequester
                .builder()
                .connectTcp("localhost", 7000)
                .block();

    }

@RestController
class RST {

    @Autowired
    private RSocketRequester requester;

    @GetMapping(path = "/ping")
    public Mono<String> ping(){
        return this.requester
                .route("pong")
                .data("TEST")
                .retrieveMono(String.class)
                .doOnNext(System.out::println);
    }
}

推荐答案

您可以使用可恢复客户端实现此目标.该客户端应对RejectedResumeException做出反应并重新创建自己.

You could achieve it with resumable client. This client should react on RejectedResumeException and recreate itself.

客户:

@Component
class RSocketRequesterSupplier implements Supplier<RSocketRequester> {
    private static final AtomicReference<RSocketRequester> R_SOCKET_REQUESTER =
            new AtomicReference<>();

    @Autowired
    private RSocketRequester.Builder rSocketRequesterBuilder;

    @PostConstruct
    void init() {
        rSocketRequesterBuilder
                .rsocketFactory(rsocketFactory -> rsocketFactory
                        .errorConsumer(throwable -> {
                            if (throwable instanceof RejectedResumeException) {
                                init();
                            }
                        })
                        .resume()
                        // tune it for your requirements
                        .resumeStreamTimeout(Duration.ofSeconds(1))
                        .resumeStrategy(() -> new PeriodicResumeStrategy(
                                Duration.ofSeconds(1)))
                )
                .connectTcp("localhost", 7000)
                .retryBackoff(Integer.MAX_VALUE, Duration.ofSeconds(1))
                .subscribe(R_SOCKET_REQUESTER::set);
    }

    @Override
    public RSocketRequester get() {
        return R_SOCKET_REQUESTER.get();
    }
}

@RestController
public class RST {
    @Autowired
    private RSocketRequesterSupplier rSocketRequesterSupplier;

    @GetMapping(path = "/ping")
    public Mono<String> ping() {
        return rSocketRequesterSupplier.get()
                .route("pong")
                .data("TEST")
                .retrieveMono(String.class)
                .doOnNext(System.out::println);
    }
}

服务器:

@Bean
ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
    return RSocketFactory.ServerRSocketFactory::resume;
}

这篇关于Spring Boot RSocketRequester处理服务器重启的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆