反应式扩展和重试 [英] Reactive Extensions and Retry

查看:52
本文介绍了反应式扩展和重试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

今天早上,一系列文章突然出现在我的视线中.它始于这个问题,这导致了原始示例e 和 源代码在 GitHub 上.

So a series of articles popped on my radar this morning. It started with this question, which lead to the original example and source code on GitHub.

我稍微重写了一下,所以我可以开始在控制台和服务应用程序中使用它:

I rewrote it slightly, so I can start using it in Console and Service applications:

public static class Extensions
{
    static readonly TaskPoolScheduler Scheduler = new TaskPoolScheduler(new TaskFactory());

    // Licensed under the MIT license with <3 by GitHub

    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 8, 16...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    /// <summary>
    /// A linear strategy which starts with 1 second and then 2, 3, 4...
    /// </summary>
    [SuppressMessage("Microsoft.Security", "CA2104:DoNotDeclareReadOnlyMutableReferenceTypes")]
    public static readonly Func<int, TimeSpan> LinearStrategy = n => TimeSpan.FromSeconds(1*n);

    /// <summary>
    /// Returns a cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates. Allows for customizable back off strategy.
    /// </summary>
    /// <param name="source">The source observable.</param>
    /// <param name="retryCount">The number of attempts of running the source observable before failing.</param>
    /// <param name="strategy">The strategy to use in backing off, exponential by default.</param>
    /// <param name="retryOnError">A predicate determining for which exceptions to retry. Defaults to all</param>
    /// <param name="scheduler">The scheduler.</param>
    /// <returns>
    /// A cold observable which retries (re-subscribes to) the source observable on error up to the 
    /// specified number of times or until it successfully terminates.
    /// </returns>
    [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")]
    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? ExponentialBackoff;
        scheduler = scheduler ?? Scheduler;

        if (retryOnError == null)
            retryOnError = e => true;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e => retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}

现在为了测试它是如何工作的,我编写了这个小程序:

Now to test how it works, I've written this small program:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var cts = new CancellationTokenSource();

        var sched = new TaskPoolScheduler(new TaskFactory());
        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });

        source.RetryWithBackoffStrategy(scheduler: sched, strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);

        while (!cts.IsCancellationRequested)
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message); 

                },
                () =>
                {
                    cts.Cancel();
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }
}

一开始我以为,订阅的事件,会自动触发所有后续的retirement.事实并非如此,所以我必须实现一个取消令牌并循环,直到它发出所有 reties 已用尽的信号.

Initially I have thought, that the event of subscription, will automatically trigger all the subsequent retires. That was not the case, so I had to implement a Cancellation Token and loop until it signals that all reties have been exhausted.

另一种选择是使用 AutoResetEvent:

The other option is to use AutoResetEvent:

class Program
{
    static void Main(string[] args)
    {
        int tryCount = 0;
        var auto = new AutoResetEvent(false);

        var source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                var a = 5/tryCount++;
                return Observable.Return("yolo");
            });

        source.RetryWithBackoffStrategy(strategy: Extensions.LinearStrategy, retryOnError: exception => exception is DivideByZeroException);

        while (!auto.WaitOne(1))
        {
            source.Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex =>
                {
                    Console.WriteLine("Error: {0}", ex.Message);
                },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                    auto.Set();
                });
        }
    }
}

在这两种情况下,它都会显示以下几行:

In both scenarios it will display these lines:

Action 0
Error: Attempted to divide by zero.
Action 1
Result: yolo
End Processing after 2 attempts

我对这群人的问题是:这是使用此扩展程序的最佳方式吗?或者有没有办法订阅 Observable 以便它自己重新触发,直到重试次数?

The question I have to this crowd is: Is this the best way to use this extension? Or is there a way to subscribe to the Observable so it will re-fire itself, up to the number of retries?

最终更新

根据布兰登的建议,这是订阅的正确方式:

Based on Brandon's suggestion, this is the proper way of subscribing:

internal class Program
{
    #region Methods

    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(strategy: Extensions.ExponentialBackoff, retryOnError: exception => exception is DivideByZeroException, scheduler: Scheduler.Immediate)
            .Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("End Processing after {0} attempts", tryCount);
                });
    }

    #endregion
}

输出会略有不同:

Action 0
Action 1
Result: yolo
End Processing after 2 attempts

事实证明这是一个非常有用的扩展.这是如何使用它的另一个示例,其中使用委托给出了策略和错误处理.

This turned out to be quite useful extension. Here is another example how it can be used, where strategy and error processing is given using delegates.

internal class Program
{
    #region Methods

    private static void Main(string[] args)
    {
        int tryCount = 0;
        IObservable<string> source = Observable.Defer(
            () =>
            {
                Console.WriteLine("Action {0}", tryCount);
                int a = 5 / tryCount++;
                return Observable.Return("yolo");
            });
        source.RetryWithBackoffStrategy(
            strategy: i => TimeSpan.FromMilliseconds(1),
            retryOnError: exception =>
            {
                if (exception is DivideByZeroException)
                {
                    Console.WriteLine("Tried to divide by zero");
                    return true;
                }
                return false;
            },
            scheduler: Scheduler.Immediate).Subscribe(
                res => { Console.WriteLine("Result: {0}", res); },
                ex => { Console.WriteLine("Error: {0}", ex.Message); },
                () =>
                {
                    Console.WriteLine("Succeeded after {0} attempts", tryCount);
                });
    }

    #endregion
}

输出:

Action 0
Tried to divide by zero
Action 1
Result: yolo
Succeeded after 2 attempts

推荐答案

是的,Rx 通常是异步的,因此在编写测试时,您需要等待它完成(否则 Main 会在您调用 Subscribe 后立即退出).

Yeah Rx is generally asynchronous so when writing tests, you need to wait for it to finish (otherwise Main just exits right after your call to Subscribe).

另外,请确保您订阅了通过调用 source.RetryWithBackoffStrategy(...) 生成的 observable.这会产生一个具有重试语义的 observable.

Also, make sure you subscribe to the observable produced by calling source.RetryWithBackoffStrategy(...). That produces a new observable that has the retry semantics.

在这种情况下,最简单的解决方案是直接使用 Wait:

Easiest solution in cases like this is to literally use Wait:

try
{
  var source2 = source.RetryWithBackoffStrategy(/*...*/);

  // blocks the current thread until the source finishes
  var result = source2.Wait(); 
  Console.WriteLine("result=" + result);
}
catch (Exception err)
{
  Console.WriteLine("uh oh", err);
}

如果你使用 NUnit(支持异步测试)之类的东西来编写你的测试,那么你可以这样做:

If you use something like NUnit (which supports asynchronous tests) to write your tests, then you can do:

[Test]
public async Task MyTest()
{
    var source = // ...;
    var source2 = source.RetryWithBackoffStrategy(/*...*/);
    var result = await source2; // you can await observables
    Assert.That(result, Is.EqualTo(5));
}

这篇关于反应式扩展和重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆