Rx.Net + Reactive-Ui + MahApps.Metro - 重复 &使用门控对话框重试异步请求 [英] Rx.Net + Reactive-Ui + MahApps.Metro - Repeating & retrying asynchronous request with gated dialogs

查看:71
本文介绍了Rx.Net + Reactive-Ui + MahApps.Metro - 重复 &使用门控对话框重试异步请求的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定一个可观察的形式:

Given an observable of form:

var fetchTimer = Observable.Timer(TimeSpan.FromSeconds(1));
var stateFetcher =
    Observable.FromAsync(async () => await _robotsClient.GetRobotsStateAsync(new GetRobotsStateRequest()));

var delayedFetch = fetchTimer.SelectMany(stateFetcher);

这提供了在延迟后获取状态的方法.

This provides the means for fetching the state after a delay.

修改可以定期执行此操作:

A modification can do this at regular intervals:

var regularFetch = Observable.Interval(TimeSpan.FromSeconds(5)).Select(_ => stateFetcher).Switch();

这每 5 秒请求一个值.

This requests a value every 5 seconds.

但是请求可能会失败(远程服务无法访问等);考虑到这一点,尝试生成重试操作的机制以及提醒用户的钩子可能会很棘手.

The request can fail however (remote service unreachable etc); with that in mind, trying to produce a mechanism to retry the operation, as well as hooks for alerting the user can be tricky.

在失败时暂停基于计时器的操作 - 这个问题由我介绍了对此的初步方法,以及一些尝试/部分解决方案.

Suspending a timer-based operation on failure - this question by me covers the initial approach to this, as well as some of the attempts / a partial solution.

在这里我想分享一下我得到的解决方案.

Here I want to share the solution I arrived at.

推荐答案

我们可以将问题总结如下:

We can summarise the problem as follows:

  1. 我们有一个可以产生值错误的可观察源
  2. 如果发生错误,我们需要做一些事情
  3. 从第一个错误开始,我们希望通过通知进行多次尝试
  4. 如果这些尝试都失败了,我们想做点什么
  5. 在流程结束时,我们希望重新启动所有流程.

所以:

错误 ->初始对话框 ->在每次尝试时重试通知 ->再做一遍

在整个过程中的任何时候,成功的价值排放都应该绕过一切并回流.

At any point throughout this process a successful value emission should bypass everything and flow back out.

考虑到这种高度自以为是的方法,这是我创建的实用程序:

With that highly opinionated approach in mind, here is the utility I created:

public static IObservable<T> WithGatedRetry<T>(
    this IObservable<T> source,
    int retriesPerCycle,
    Func<Exception, Task> onInitialFailure,
    Func<Action<Func<Task>>, Task<Func<Exception, int, Task>>> retryNotificationBlock,
    Func<Exception, Task> onFailedCycle)
{
    IObservable<T> GetInitialHandler(Exception e) =>
        Observable.FromAsync(() => onInitialFailure(e))
        .Select(_ => (T)default);

    IObservable<T> GetCycleFailureHandler(Exception e) =>
        Observable.FromAsync(() => onFailedCycle(e))
        .Select(_ => (T)default);

    IObservable<T> GetRetryFlow() =>
        Observable.Create<T>(async sub =>
        {
            var attempt = 1;
            Func<Task> disposeCallback = () => Task.CompletedTask;
            var notifier = await retryNotificationBlock(dc =>
            {
                disposeCallback = dc;
            });

            await notifier(null, 1);

            return
                source
                .Do(
                     _ =>
                    {
                    },
                    async (Exception e) =>
                    {
                        if (attempt + 1 <= retriesPerCycle)
                        {
                            await notifier(e, ++attempt);
                        }
                    }
                )
                .Retry(retriesPerCycle)
                .Finally(async () =>
                {
                    if (disposeCallback != null)
                    {
                        await disposeCallback();
                    }
                })
                .Subscribe(
                    val => { sub.OnNext(val); sub.OnCompleted(); },
                    (Exception e) => { sub.OnError(e); }
                );
        });

    IObservable<T> GetCycleFlow() =>
        GetRetryFlow()
        .Catch((Exception e) =>
            GetCycleFailureHandler(e)
            .Select(_ => GetCycleFlow())
            .Switch()
        )
        .Retry();

    IObservable<T> GetPrimaryFlow() =>
        source
        .Catch((Exception e) => GetInitialHandler(e))
        .Select(val =>
            EqualityComparer<T>.Default.Equals(val, default)
            ? GetCycleFlow().Select(_ => GetPrimaryFlow()).Switch()
            : GetPrimaryFlow().StartWith(val)
        )
        .Switch();

    return GetPrimaryFlow();
}

我完全承认这可能不是最好的方法,并且通知块中有一些回调内部回调kludge(对于每次重试尝试),以支持清理一次重试循环"已完成(成功或不成功).

I'll fully admit this may not be the best way to do it, and there's a bit of a callback-inside-a-callback kludge in the notification block (for each retry attempt), in order to support cleaning up once an retry "cycle" has been completed (successfully or otherwise).

用法如下:

var latestState =
    Observable.SelectMany(fetchTimer, stateFetcher)
    .WithGatedRetry(
        3,
        async ex =>
        {
            // show initial error dialog
        },
        async (disposerHook) =>
        {
            // Show the "attempting retries" dialog

            disposerHook(async () =>
            {
                // Close the "attempting retries" dialog
                // and do any cleanup
            });

            return async (Exception ex, int attempt) =>
            {
                // Update the dialog
                // ex is the exception produced by the just-completed attempt
            };
        },
        async ex =>
        {
           // Show the "We still can't quite get it" dialog
           // after this task completes, the entire process starts again
        }
    )
    .Publish();

这种方法允许定制的挂钩点,并按预期传递成功的价值.

This approach allows for the tailored hook-points, and flows successful values as expected.

事实上,下游订阅者应该只在成功提供值时看到一个值 - 他们也不应该看到错误,因为它处于无限重试中.

In fact, downstream subscribers should only ever see a value when one is successfully provided - they also shouldn't see an error as it sits in an infinite retry.

与原始问题中的解决方案相比,这使用 Select + Switch 而不是 SelectMany 以确保内部可观察对象是正确取消订阅和处理.

In comparison with the solution in the original question, this uses Select + Switch as opposed to SelectMany in order to ensure inner observables are correctly unsubscribed and disposed.

这篇关于Rx.Net + Reactive-Ui + MahApps.Metro - 重复 &amp;使用门控对话框重试异步请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆