Webflux repeatWhenEmpty 或 retryWhen [英] Webflux repeatWhenEmpty or retryWhen

查看:75
本文介绍了Webflux repeatWhenEmpty 或 retryWhen的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Webflux 中使用 SpringBoot 和响应式编程.我想重复调用我的端点,直到数据可用(将返回某些内容).

I use SpringBoot and reactive programming with Webflux. I want to repeat some calling of my endpoint till data will be available (something will be returned).

我想调用 commandControllerApi.findById 直到 displayCommand 返回状态 == SUCCESS.如何告诉 Webflux 我的链的这一部分应该被调用 5 次,因为我的数据库中的数据应该在 5-10 秒后出现...

I want to call commandControllerApi.findById until displayCommand will be returned with status == SUCCESS. How to tell Webflux that this part of my chain should be called for example 5 times because the data in my database should apper after 5-10 seconds...

我认为当前的代码会导致再次调用整个链,而不仅仅是我链的适当部分(.flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId())))

I think the current code causes calling the whole chain again and not only the proper part of my chain (.flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId())))

我的代码:

public Mono<Boolean> validateCredentials(FlowConfCredentials flowCredentials, UUID agentId) {
    return securityService
        .getUser()
        .flatMap(
            user -> {
              Command command = new Command ();
              command.setAgentId(agentId.toString());
              command.setCommandType(COMMAND_TYPE);
              command.setArguments(createArguments());
              command.setCreatedBy(
                  user.getEmail());
              return commandControllerApi.saveCommand(command);
            })
        //       .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
      .flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId()))
        .filter(displayCommand -> displayCommand.getStatus().equals(OaCommandStatus.SUCCESS))
        .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
//        .repeatWhenEmpty(
//            Repeat.onlyIf(repeatContext -> true)
//                .exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(5))
//                .timeout(Duration.ofSeconds(30)))
        .filter(
            commandResponse ->
                commandResponse.getStatus() != null
                    && commandResponse.getStatus().equals(CommandStatus.SUCCESS))
        .map(commandResponse -> true)
        .switchIfEmpty(Mono.just(false));
  }

下面是调用上面的方法的方法:

And below is the method that is calling the metohd above:

 public Flux<CredConfiguration> saveCredentials(
      Mono<FlowConfCredentials> flowCredentials, UUID agentId) {
    return flowCredentials
        .filterWhen(cred -> validationService.validateCredentials(cred, agentId))
        .flatMapMany(
            flowConfCredentials -> {
              if (condition1()) {
                return caveCredentials(flowConfCredentials);
              }
              if (condition2()) {
                return saveCredentialsForUser(flowConfCredentials.getExistingCredentials());
              }
              return Flux.error(new EmptyConfigurationException(CREDENTIALS_MESSAGE));
            });
  }

推荐答案

要仅重复订阅 findById 返回的 Mono,而不重新订阅上游 saveCommand/getUser,请在调用 findById 的 flatMap 中移动 filter/repeatWhenEmpty.

To only repeat the subscription to the mono returned by findById without resubscribing to the upstream saveCommand/getUser, move the filter/repeatWhenEmpty inside the flatMap that calls findById.

public Mono<Boolean> validateCredentials(FlowConfCredentials flowCredentials, UUID agentId) {
    return securityService
        .getUser()
        .flatMap(
            user -> {
              Command command = new Command();
              command.setAgentId(agentId.toString());
              command.setCommandType(COMMAND_TYPE);
              command.setArguments(createArguments());
              command.setCreatedBy(
                  user.getEmail());
              return commandControllerApi.saveCommand(command);
            })
        .flatMap(saveResponse -> commandControllerApi.findById(saveResponse.getCommandId())
           .filter(findResponse -> findResponse.getStatus().equals(OaCommandStatus.SUCCESS))
           .repeatWhenEmpty(
                Repeat.onlyIf(repeatContext -> true)
                    .exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(5))
                    .timeout(Duration.ofSeconds(30))))
        .hasElement();
}

这篇关于Webflux repeatWhenEmpty 或 retryWhen的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆