Webflux repeatWhenEmpty 或 retryWhen [英] Webflux repeatWhenEmpty or retryWhen
问题描述
我在 Webflux 中使用 SpringBoot 和响应式编程.我想重复调用我的端点,直到数据可用(将返回某些内容).
I use SpringBoot and reactive programming with Webflux. I want to repeat some calling of my endpoint till data will be available (something will be returned).
我想调用 commandControllerApi.findById
直到 displayCommand 返回状态 == SUCCESS
.如何告诉 Webflux 我的链的这一部分应该被调用 5 次,因为我的数据库中的数据应该在 5-10 秒后出现...
I want to call commandControllerApi.findById
until displayCommand will be returned with status == SUCCESS
. How to tell Webflux that this part of my chain should be called for example 5 times because the data in my database should apper after 5-10 seconds...
我认为当前的代码会导致再次调用整个链,而不仅仅是我链的适当部分(.flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId()))
)
I think the current code causes calling the whole chain again and not only the proper part of my chain (.flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId()))
)
我的代码:
public Mono<Boolean> validateCredentials(FlowConfCredentials flowCredentials, UUID agentId) {
return securityService
.getUser()
.flatMap(
user -> {
Command command = new Command ();
command.setAgentId(agentId.toString());
command.setCommandType(COMMAND_TYPE);
command.setArguments(createArguments());
command.setCreatedBy(
user.getEmail());
return commandControllerApi.saveCommand(command);
})
// .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
.flatMap(commandResponse -> commandControllerApi.findById(commandResponse.getCommandId()))
.filter(displayCommand -> displayCommand.getStatus().equals(OaCommandStatus.SUCCESS))
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(5)))
// .repeatWhenEmpty(
// Repeat.onlyIf(repeatContext -> true)
// .exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(5))
// .timeout(Duration.ofSeconds(30)))
.filter(
commandResponse ->
commandResponse.getStatus() != null
&& commandResponse.getStatus().equals(CommandStatus.SUCCESS))
.map(commandResponse -> true)
.switchIfEmpty(Mono.just(false));
}
下面是调用上面的方法的方法:
And below is the method that is calling the metohd above:
public Flux<CredConfiguration> saveCredentials(
Mono<FlowConfCredentials> flowCredentials, UUID agentId) {
return flowCredentials
.filterWhen(cred -> validationService.validateCredentials(cred, agentId))
.flatMapMany(
flowConfCredentials -> {
if (condition1()) {
return caveCredentials(flowConfCredentials);
}
if (condition2()) {
return saveCredentialsForUser(flowConfCredentials.getExistingCredentials());
}
return Flux.error(new EmptyConfigurationException(CREDENTIALS_MESSAGE));
});
}
推荐答案
要仅重复订阅 findById 返回的 Mono,而不重新订阅上游 saveCommand/getUser,请在调用 findById 的 flatMap 中移动 filter/repeatWhenEmpty.
To only repeat the subscription to the mono returned by findById without resubscribing to the upstream saveCommand/getUser, move the filter/repeatWhenEmpty inside the flatMap that calls findById.
public Mono<Boolean> validateCredentials(FlowConfCredentials flowCredentials, UUID agentId) {
return securityService
.getUser()
.flatMap(
user -> {
Command command = new Command();
command.setAgentId(agentId.toString());
command.setCommandType(COMMAND_TYPE);
command.setArguments(createArguments());
command.setCreatedBy(
user.getEmail());
return commandControllerApi.saveCommand(command);
})
.flatMap(saveResponse -> commandControllerApi.findById(saveResponse.getCommandId())
.filter(findResponse -> findResponse.getStatus().equals(OaCommandStatus.SUCCESS))
.repeatWhenEmpty(
Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofSeconds(5), Duration.ofSeconds(5))
.timeout(Duration.ofSeconds(30))))
.hasElement();
}
这篇关于Webflux repeatWhenEmpty 或 retryWhen的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!