如何对断路器的所有重试中调用的断路器进行故障预置 [英] How to make fallback for circuit breaker invoked on all retries on the broken circuit
问题描述
我有以下政策:
var sharedBulkhead = Policy.BulkheadAsync(
maxParallelization: maxParallelizations,
maxQueuingActions: maxQueuingActions,
onBulkheadRejectedAsync: (context) =>
{
Log.Info($"Bulk head rejected => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
return TaskHelper.EmptyTask;
}
);
var retryPolicy = Policy.Handle<HttpRequestException>()
.Or<BrokenCircuitException>().WaitAndRetryAsync(
retryCount: maxRetryCount,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)),
onRetryAsync: (exception, calculatedWaitDuration, retryCount, context) =>
{
Log.Error($"Retry => Count: {retryCount}, Wait duration: {calculatedWaitDuration}, Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}.");
return TaskHelper.EmptyTask;
});
var circuitBreaker = Policy.Handle<Exception>(e => (e is HttpRequestException)).CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: maxExceptionsBeforeBreaking,
durationOfBreak: TimeSpan.FromSeconds(circuitBreakDurationSeconds),
onBreak: (exception, timespan, context) =>
{
Log.Error($"Circuit broken => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}, Exception: {exception}");
},
onReset: (context) =>
{
Log.Info($"Circuit reset => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
}
);
var fallbackForCircuitBreaker = Policy<bool>
.Handle<BrokenCircuitException>()
.FallbackAsync(
fallbackValue: false,
onFallbackAsync: (b, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
return TaskHelper.EmptyTask;
}
);
var fallbackForAnyException = Policy<bool>
.Handle<Exception>()
.FallbackAsync(
fallbackAction: (ct, context) => { return Task.FromResult(false); },
onFallbackAsync: (e, context) =>
{
Log.Error($"An unexpected error occured => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
return TaskHelper.EmptyTask;
}
);
var resilienceStrategy = Policy.WrapAsync(retryPolicy, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));
现在,仅当所有重试均失败,并且最后一次重试失败并以BrokenCircuitException
时,才调用fallbackForCircuitBreaker
.为了在每次断路重试时调用fallbackForCircuitBreaker
,应该进行哪些更改?
Now, fallbackForCircuitBreaker
is only invoked if all retries fail, and if the last retry fails with BrokenCircuitException
. What changes should be made in order for fallbackForCircuitBreaker
to be invoked every time a retry is made on a broken circuit?
此外,我正在使用sharedBulkHead
,它是服务中的实例字段,并在构造函数中初始化.这是一个好习惯吗?理想情况下,在onBulkheadRejectedAsync
上应该做什么?我可以修改重试策略来处理批量拒绝吗?
Also, I am using a sharedBulkHead
which is an instance field in the service and is initialized in the constructor. Is that a good practise? What is to be done ideally on onBulkheadRejectedAsync
? Can I modify the retry policy to handle bulk head rejection as well?
推荐答案
现在,仅当所有重试均失败且最后一次重试失败并带有BrokenCircuitException时,才调用fallbackForCircuitBreaker.为了使每次在断路电路上重试时调用fallbackForCircuitBreaker,应该进行哪些更改?
Now, fallbackForCircuitBreaker is only invoked if all retries fail, and if the last retry fails with BrokenCircuitException. What changes should be made in order for fallbackForCircuitBreaker to be invoked every time a retry is made on a broken circuit?
请参见 PolicyWrap文档,尤其是
See the PolicyWrap documentation, especially the diagrams and description of operation. Policies in a PolicyWrap act like a sequence of nested middleware around the call:
- 最外层(按读取顺序最左)策略执行下一个内层,然后执行下一个内层,依此类推;直到最里面的策略执行用户委托为止;
- 抛出的异常通过层向外冒泡(直到处理)
因此,要使(等于)每次尝试调用fallbackForCircuitBreaker
,请将其移到重试策略中.
So, to make (an equivalent to) fallbackForCircuitBreaker
invoked per try, move it inside the retry policy.
但是,当前的fallbackForCircuitBreaker
将抛出的异常替换为返回值false
,这听起来像是您从每次尝试回退中寻求的是日志,然后进行下一次尝试".为此的技术是将后备用作日志,然后重新抛出,以便您的重试策略仍然可以响应(重新抛出)异常.所以:
The current fallbackForCircuitBreaker
however substitutes the thrown exception with a return value false
, whereas it sounds like what you are seeking from fallback-per-try is a 'log, then make the next try'. The technique for that is to use fallback as log then rethrow, so that your retry policy can still respond to the (rethrown) exception. So:
var sharedBulkhead = /* as before */;
var retryPolicy = /* as before */;
var fallbackForCircuitBreaker = /* as before */;
var logCircuitBreakerBrokenPerTry = Policy<bool>
.Handle<BrokenCircuitException>()
.FallbackAsync(
fallbackValue: false,
onFallbackAsync: (outcome, context) =>
{
Log.Error($"Operation attempted on broken circuit => Policy Wrap: {context.PolicyWrapKey}, Policy: {context.PolicyKey}, Endpoint: {context.OperationKey}");
throw outcome.Exception;
}
);
var fallbackForAnyException = /* as before */;
var resilienceStrategy = Policy.WrapAsync(retryPolicy, logCircuitBreakerBrokenPerTry, circuitBreaker, sharedBulkhead);
var policyWrap = fallbackForAnyException.WrapAsync(fallbackForCircuitBreaker.WrapAsync(resilienceStrategy));
我可以修改重试策略来处理批量头拒绝吗?
Can I modify the retry policy to handle bulk head rejection as well?
Polly舱壁文档指出该策略抛出BulkheadRejectedException
,所以:
The Polly bulkhead documentation states the policy throws BulkheadRejectedException
, so:
var retryPolicy = Policy
.Handle<HttpRequestException>()
.Or<BrokenCircuitException>()
.Or<BulkheadRejectedException>()
/* etc */
理想情况下在onBulkheadRejectedAsync上要做什么?
What is to be done ideally on onBulkheadRejectedAsync?
您可以登录.广义上讲,您可以减轻多余的负载,也可以使用隔板抑制作为触发来水平扩展以增加容量. Polly文档提供了更多讨论 和此处.
You can log. Broadly speaking, you can shed the excess load, or use the bulkhead rejection as a trigger to scale horizontally to increase capacity. The Polly documentation provides more discussion here and here.
此外,我正在使用sharedBulkHead,它是服务中的实例字段,并在构造函数中初始化.这是一个好习惯吗?
Also, I am using a sharedBulkHead which is an instance field in the service and is initialized in the constructor. Is that a good practise?
这取决于. Bulkhead策略实例的生命周期必须为受管制的呼叫(而不是每个呼叫),以便让Bulkhead策略实例的状态决定并发执行的呼叫数.
It depends. The lifetime of the Bulkhead policy instance must be long-lived across the governed calls, not per call, in order for the state of the Bulkhead policy instance to govern the number of calls executing concurrently.
- 如果该服务作为长期存在的单例存在,则将隔板策略保留在实例字段中将是适当的,因为隔板策略实例也将长期存在.
- 如果服务类的实例是由DI容器作为临时/按请求创建的,则需要确保隔板策略实例仍然有效并在并发请求之间共享(例如,使它成为
static
) ,而不是按请求. - 如果通过HttpClientFactory在HttpClient上配置了隔板的实例,请遵循
- If the service exists as a long-lived singleton, holding the bulkhead policy in an instance field would be appropriate as the bulkhead policy instance would also be long lived.
- If instances of the service class are created as transient/per-request by a DI container, you would need to ensure the bulkhead policy instance was still long-lived and shared across concurrent requests (eg by making it
static
), not per-request. - If instances of the bulkhead are configured on an HttpClient by HttpClientFactory, follow the notes on scoping stateful policies with HttpClientFactory in the Polly and HttpClientFactory documentation.
这篇关于如何对断路器的所有重试中调用的断路器进行故障预置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!