Spring WebClient - 如何根据响应头延迟重试 [英] Spring WebClient - how to retry with delay based on response header
问题描述
我一直在学习 Spring Webflux 和反应式编程,但遇到了一个问题,我试图使用 Spring Webclient 解决重试逻辑.我已经创建了一个客户端并成功调用了一个返回一些 JSON 数据的外部网络服务 GET 端点.
I've been learning Spring Webflux and reactive programming and have gotten stuck on a problem I'm trying to solve around retry logic using Spring Webclient. I've created a client and made successful calls to an external web-service GET endpoint that returns some JSON data.
当外部服务以 503 - Service Unavailable
状态响应时,响应包含一个 Retry-After
标头,该标头的值指示我在重试前应等待多长时间请求.我想在 Spring Webflux/Reactor 中找到一种方法来告诉 webClient 在 X 周期后重试它的请求,其中 X 是现在与我从响应标头中解析出的 DateTime 之间的差异.
When the external service responds with a 503 - Service Unavailable
status, the response includes a Retry-After
header with a value that indicates how long I should wait before retrying the request. I want to find a way within Spring Webflux/Reactor to tell the webClient to retry it's request after X period, where X is the difference between now and the DateTime that I parse out of the response header.
public <T> Mono<T> get(final String url, Class<T> clazz) {
return webClient
.get().uri(url)
.retrieve()
.bodyToMono(clazz);
}
WebClient 生成器
我使用构建器创建了上述方法中使用的 webClient
变量,并将其作为实例变量存储在类中.
WebClient Builder
I use a builder to create the webClient
variable used in the above method, and it's stored as an instance variable in the class.
webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
clientCodecConfigurer.defaultCodecs();
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClient = webClientBuilder.build();
重试时间
我已尝试通过 Retry
类理解和使用 retryWhen
方法,但无法确定是否可以访问或传递响应标头值
Retry When
I've tried to understand and use the retryWhen
method with the Retry
class, but can't figure out if I can access or pass through the response header value there.
public <T> Mono<T> get(final String url, Class<T> clazz) {
return webClient
.get().uri(url)
.retrieve()
.bodyToMono(clazz);
.retryWhen(new Retry() {
@Override
public Publisher<?> generateCompanion(final Flux<RetrySignal> retrySignals) {
// Can I use retrySignals or retryContext to find the response header somehow?
// If I can find the response header, how to return a "yes-retry" response?
}
})
}
具有额外逻辑和数据库交互的过滤器
我还尝试做一些额外的逻辑并在 WebClient.Builder 中使用过滤器,但这只会让我停止新请求(调用 #get
),直到先前建立的 Retry-After 值已过.
Filter(s) with Extra Logic and DB Interaction
I've also tried to do some extra logic and use filters with the WebClient.Builder, but that only gets me to a point of halting a new request (call to #get
) until a previously established Retry-After value has elapsed.
webClientBuilder = WebClient.builder();
webClientBuilder.codecs(clientCodecConfigurer -> {
clientCodecConfigurer.defaultCodecs();
clientCodecConfigurer.customCodecs().register(new Jackson2JsonDecoder());
clientCodecConfigurer.customCodecs().register(new Jackson2JsonEncoder());
});
webClientBuilder.filter(ExchangeFilterFunction.ofRequestProcessor(clientRequest -> {
final Clock clock = Clock.systemUTC();
final int id = (int) clientRequest.attribute("id"); // id is saved as an attribute for the request, pull it out here
final long retryAfterEpochMillis = // get epoch millisecond from DB for id
if(epoch is in the past) {
return Mono.just(clientRequest);
} else { // have to wait until epoch passes to send request
return Mono.just(clientRequest).delayElement(Duration.between(clock.instant(), Instant.ofEpochMilli(retryAfterEpochMillis)));
}
})
);
webClient = webClientBuilder.build();
.onStatus(HttpStatus::isError, response -> {
final List<String> retryAfterHeaders = response.headers().header("Retry-After");
if(retryAfterHeaders.size() > 0) {
final long retryAfterEpochMillis = // parse millisecond epoch time from header
// Save millisecond time to DB associated to specific id
}
return response.bodyToMono(String.class).flatMap(body ->
Mono.error(new RuntimeException(
String.format("Request url {%s} failed with status {%s} and reason {%s}",
url,
response.rawStatusCode(),
body))));
})
感谢任何帮助,如果我能提供更多上下文数据来提供帮助,我会的.
Any help is appreciated, and if I can provide more contextual data to help, I will.
推荐答案
1.在重试构建器中检索标头
public class WebClientStatefulRetry3 {
public static void main(String[] args) {
WebClient webClient = WebClient.create();
call(webClient)
.retryWhen(Retry.indefinitely()
.filter(ex -> ex instanceof WebClientResponseException.ServiceUnavailable)
.doBeforeRetryAsync(signal -> Mono.delay(calculateDelay(signal.failure())).then()))
.block();
}
private static Mono<String> call(WebClient webClient) {
return webClient.get()
.uri("http://mockbin.org/bin/b2a26614-0219-4018-9446-c03bc1868ebf")
.retrieve()
.bodyToMono(String.class);
}
private static Duration calculateDelay(Throwable failure) {
String headerValue = ((WebClientResponseException.ServiceUnavailable) failure).getHeaders().get("Retry-After").get(0);
return // calculate delay here from header and current time;
}
}
2.使用扩展运算符访问前一个响应并生成下一个
public class WebClientRetryWithExpand {
public static void main(String[] args) {
WebClient webClient = WebClient.create();
call(webClient)
.expand(prevResponse -> {
List<String> header = prevResponse.headers.header("Retry-After");
if (header.isEmpty()) {
return Mono.empty();
}
long delayInMillis = // calculate delay from header and current time
return Mono.delay(Duration.ofMillis(delayInMillis))
.then(call(webClient));
})
.last()
.block();
}
private static Mono<ResponseWithHeaders> call(WebClient webClient) {
return webClient.get()
.uri("https://example.com")
.exchangeToMono(response -> response.bodyToMono(String.class)
.map(rawResponse -> new ResponseWithHeaders(rawResponse, response.headers())));
}
@Data
static class ResponseWithHeaders {
private final String rawResponse;
private final ClientResponse.Headers headers;
}
}
这篇关于Spring WebClient - 如何根据响应头延迟重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!