Rx.Net + Reactive-Ui + MahApps.Metro - 重复 &使用门控对话框重试异步请求 [英] Rx.Net + Reactive-Ui + MahApps.Metro - Repeating & retrying asynchronous request with gated dialogs
问题描述
给定一个可观察的形式:
Given an observable of form:
var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(1));
var stateFetcher =
Observable.FromAsync(async () => await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));
var delayedFetch = fetchTimer.SelectMany(stateFetcher);
这提供了在延迟后获取状态的方法.
This provides the means for fetching the state after a delay.
修改可以定期执行此操作:
A modification can do this at regular intervals:
var regularFetch = Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => stateFetcher).Switch();
这每 5 秒请求一个值.
This requests a value every 5 seconds.
但是请求可能会失败(远程服务无法访问等);考虑到这一点,尝试生成重试操作的机制以及提醒用户的钩子可能会很棘手.
The request can fail however (remote service unreachable etc); with that in mind, trying to produce a mechanism to retry the operation, as well as hooks for alerting the user can be tricky.
在失败时暂停基于计时器的操作 - 这个问题由我介绍了对此的初步方法,以及一些尝试/部分解决方案.
Suspending a timer-based operation on failure - this question by me covers the initial approach to this, as well as some of the attempts / a partial solution.
在这里我想分享一下我得到的解决方案.
Here I want to share the solution I arrived at.
推荐答案
我们可以将问题总结如下:
We can summarise the problem as follows:
- 我们有一个可以产生值或错误的可观察源
- 如果发生错误,我们需要做一些事情
- 从第一个错误开始,我们希望通过通知进行多次尝试
- 如果这些尝试都失败了,我们想做点什么
- 在流程结束时,我们希望重新启动所有流程.
所以:
错误
->初始对话框
->在每次尝试时重试通知
->再做一遍
在整个过程中的任何时候,成功的价值排放都应该绕过一切并回流.
At any point throughout this process a successful value emission should bypass everything and flow back out.
考虑到这种高度自以为是的方法,这是我创建的实用程序:
With that highly opinionated approach in mind, here is the utility I created:
public static IObservable<T> WithGatedRetry<T>(
this IObservable<T> source,
int retriesPerCycle,
Func<Exception, Task> onInitialFailure,
Func<Action<Func<Task>>, Task<Func<Exception, int, Task>>> retryNotificationBlock,
Func<Exception, Task> onFailedCycle)
{
IObservable<T> GetInitialHandler(Exception e) =>
Observable.FromAsync(() => onInitialFailure(e))
.Select(_ => (T)default);
IObservable<T> GetCycleFailureHandler(Exception e) =>
Observable.FromAsync(() => onFailedCycle(e))
.Select(_ => (T)default);
IObservable<T> GetRetryFlow() =>
Observable.Create<T>(async sub =>
{
var attempt = 1;
Func<Task> disposeCallback = () => Task.CompletedTask;
var notifier = await retryNotificationBlock(dc =>
{
disposeCallback = dc;
});
await notifier(null, 1);
return
source
.Do(
_ =>
{
},
async (Exception e) =>
{
if (attempt + 1 <= retriesPerCycle)
{
await notifier(e, ++attempt);
}
}
)
.Retry(retriesPerCycle)
.Finally(async () =>
{
if (disposeCallback != null)
{
await disposeCallback();
}
})
.Subscribe(
val => { sub.OnNext(val); sub.OnCompleted(); },
(Exception e) => { sub.OnError(e); }
);
});
IObservable<T> GetCycleFlow() =>
GetRetryFlow()
.Catch((Exception e) =>
GetCycleFailureHandler(e)
.Select(_ => GetCycleFlow())
.Switch()
)
.Retry();
IObservable<T> GetPrimaryFlow() =>
source
.Catch((Exception e) => GetInitialHandler(e))
.Select(val =>
EqualityComparer<T>.Default.Equals(val, default)
? GetCycleFlow().Select(_ => GetPrimaryFlow()).Switch()
: GetPrimaryFlow().StartWith(val)
)
.Switch();
return GetPrimaryFlow();
}
我完全承认这可能不是最好的方法,并且通知块中有一些回调内部回调kludge(对于每次重试尝试),以支持清理一次重试循环"已完成(成功或不成功).
I'll fully admit this may not be the best way to do it, and there's a bit of a callback-inside-a-callback kludge in the notification block (for each retry attempt), in order to support cleaning up once an retry "cycle" has been completed (successfully or otherwise).
用法如下:
var latestState =
Observable.SelectMany(fetchTimer, stateFetcher)
.WithGatedRetry(
3,
async ex =>
{
// show initial error dialog
},
async (disposerHook) =>
{
// Show the "attempting retries" dialog
disposerHook(async () =>
{
// Close the "attempting retries" dialog
// and do any cleanup
});
return async (Exception ex, int attempt) =>
{
// Update the dialog
// ex is the exception produced by the just-completed attempt
};
},
async ex =>
{
// Show the "We still can't quite get it" dialog
// after this task completes, the entire process starts again
}
)
.Publish();
这种方法允许定制的挂钩点,并按预期传递成功的价值.
This approach allows for the tailored hook-points, and flows successful values as expected.
事实上,下游订阅者应该只在成功提供值时看到一个值 - 他们也不应该看到错误,因为它处于无限重试中.
In fact, downstream subscribers should only ever see a value when one is successfully provided - they also shouldn't see an error as it sits in an infinite retry.
与原始问题中的解决方案相比,这使用 Select
+ Switch
而不是 SelectMany
以确保内部可观察对象是正确取消订阅和处理.
In comparison with the solution in the original question, this uses Select
+ Switch
as opposed to SelectMany
in order to ensure inner observables are correctly unsubscribed and disposed.
这篇关于Rx.Net + Reactive-Ui + MahApps.Metro - 重复 &使用门控对话框重试异步请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!