RX退回并重试 [英] Rx back off and retry

查看:62
本文介绍了RX退回并重试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这基于此SO中提供的代码:编写Rx"RetryAfter";扩展方法

This is based on the code presented in this SO : Write an Rx "RetryAfter" extension method

我正在使用Markus Olsson的代码(目前仅进行评估),在有人问我试图在Github上掌握Markus的代码之前,但这在我工作的地方受阻,所以我感到唯一可以做的事情是在这里问问.抱歉,如果这与任何人都坐不好的话.

I am using the code by Markus Olsson (evaluation only at the moment), and before anyone asks I have tried to get hold of Markus on Github, but that is blocked where I work, so I felt the only thing I could do was ask here at SO. Sorry about that, if this sits badly with any one.

因此,在一个小示例中,我正在使用以下代码:

So I am using the following code, in a small demo which is this:

class Attempt1
{
    private static bool shouldThrow = true;

    static void Main(string[] args)
    {
        Generate().RetryWithBackoffStrategy(3,
               MyRxExtensions.ExponentialBackoff,
            ex =>
            {
                return ex is NullReferenceException;
            }, Scheduler.TaskPool)
            .Subscribe(
                OnNext,
                OnError
            );

        Console.ReadLine();
    }

    private static void OnNext(int val)
    {
        Console.WriteLine("subscriber value is {0} which was seen on threadId:{1}",
            val, Thread.CurrentThread.ManagedThreadId);
    }

    private static void OnError(Exception ex)
    {
        Console.WriteLine("subscriber bad {0}, which was seen on threadId:{1}",
            ex.GetType(),
            Thread.CurrentThread.ManagedThreadId);
    }

    static IObservable<int> Generate()
    {
        return Observable.Create<int>(
            o =>
            {
                Scheduler.TaskPool.Schedule(() =>
                {
                    if (shouldThrow)
                    {
                        shouldThrow = false;
                        Console.WriteLine("ON ERROR NullReferenceException");
                        o.OnError(new NullReferenceException("Throwing"));
                    }
                    Console.WriteLine("Invoked on threadId:{0}",
                        Thread.CurrentThread.ManagedThreadId);

                    Console.WriteLine("On nexting 1");
                    o.OnNext(1);
                    Console.WriteLine("On nexting 2");
                    o.OnNext(2);
                    Console.WriteLine("On nexting 3");
                    o.OnNext(3);
                    o.OnCompleted();
                    Console.WriteLine("On complete");
                    Console.WriteLine("Finished on threadId:{0}",
                        Thread.CurrentThread.ManagedThreadId);

                });

                return () => { };
            });
    }
}

public static class MyRxExtensions
{
    /// <summary>
    /// An exponential back off strategy which starts with 1 second and then 4, 9, 16...
    /// </summary>
    public static readonly Func<int, TimeSpan>
        ExponentialBackoff = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

    public static IObservable<T> RetryWithBackoffStrategy<T>(
        this IObservable<T> source,
        int retryCount = 3,
        Func<int, TimeSpan> strategy = null,
        Func<Exception, bool> retryOnError = null,
        IScheduler scheduler = null)
    {
        strategy = strategy ?? MyRxExtensions.ExponentialBackoff;

        int attempt = 0;

        return Observable.Defer(() =>
        {
            return ((++attempt == 1) ? source : source.Delay(strategy(attempt - 1), scheduler))
                .Select(item => new Tuple<bool, T, Exception>(true, item, null))
                .Catch<Tuple<bool, T, Exception>, Exception>(e =>
                    retryOnError(e)
                    ? Observable.Throw<Tuple<bool, T, Exception>>(e)
                    : Observable.Return(new Tuple<bool, T, Exception>(false, default(T), e)));
        })
        .Retry(retryCount)
        .SelectMany(t => t.Item1
            ? Observable.Return(t.Item2)
            : Observable.Throw<T>(t.Item3));
    }
}

代码对我来说很有意义,如果失败了,我们尝试执行操作,然后退回并重试.可以指定我们要重试的异常类型,并且我们还可以看到订阅者在重试后只能看到最终值一次,这是可行的(在上面的演示代码中,仅完成了Exception(第一次是OnError)).

The code makes sense to me, we try and do an operation if that fails we back off and retry. The exception type that we want to retry can be specified, and we also see that the subscriber only sees the final values once after retry which works (in the demo code above the Exception is only done (OnError'd the first time)).

因此,除了一件事外,代码通常都能按预期工作.

So generally the code works as expected except for one thing.

如果我看一下上面代码产生的输出,我会得到:

If I look at the output the code above produces I get this:

ON ERROR NullReferenceException 
Invoked on threadId:10 
On nexting 1
Invoked on threadId:11 
On nexting 1 
On nexting 2 
On nexting 3 
On complete 
Finished on threadId:10 
On nexting 2 
On nexting 3 
On complete 
Finished on threadId:11 
subscriber value is 1 which was seen on threadId:10 
subscriber value is 2 which was seen on threadId:10
subscriber value is 3 which was seen on threadId:10

对我而言,有趣的是,所有订阅者值都一目了然,我希望当调用Generate()方法中的OnNext时,订阅者OnNext会将OnNext的值写入控制台输出.

The interesting thing for me here is that the subscriber values all come in one go, I would have expected that when the OnNext within the Generate() method were called that the Subscribers OnNext would write the OnNext'ed value to the Console output.

有人可以阐明为什么会这样吗?

Could anyone shed any light on why this might be?

推荐答案

这是因为您要在结果流上放置 Delay .(在第二次迭代中传递给 ExponentialBackoff 的n值为1,延迟为1秒.)

It's because you are putting a Delay on the result stream. (The value for n passed to ExponentialBackoff on the second iteration is 1, giving a delay of 1 second.)

Delay 在源上运行,但是源正常运行. Delay 计划在指定的持续时间后发出的结果.因此,在Generate逻辑运行完成后,订户即可获得结果.

Delay operates on source, but the source proceeds as normal. Delay schedules the results it receives to be emitted after the specified duration. So the subscriber is getting the results after the logic of Generate has run to completion.

如果您考虑一下,这就是 Delay 的必须方式-否则 Delay 将能够以某种方式干扰上游运营商!

If you think about it this is how Delay must be - otherwise Delay would be able to somehow interfere with upstream operators!

作为慢消费方,有可能干扰上游操作员(无抛出异常).但这对于简单的 Delay 行为肯定是非常糟糕的方式.

It is possible to interfere with upstream operators (without throwing exceptions), by being a slow consumer. But that would certainly be a very bad way for a simple Delay to behave.

我认为 Delay 不是您想要的-因为 Delay 不会延迟其订阅.如果您改用 DelaySubscription ,您会得到我想得到的一切.这也是链接问题中使用的内容.

I don't think the Delay is what you intend here - because Delay doesn't delay it's subscription. If you use DelaySubscription instead, you'll get what you're after I think. This is what's used in the linked question too.

您的问题很好地说明了 Delay DelaySubscription 之间的区别!值得在这里考虑 Defer .

Your question provides a great illustration of the difference between Delay and DelaySubscription! It's worth thinking about Defer in here too.

这三个之间的区别是微妙的,但意义重大,所以让我们总结一下这三个:

The distinction between these three is subtle but significant, so let's summarize all three:

  • 延迟-在其 Subscribe 调用 Subscribe 时立即调用目标运算符以获取 IObservable 立即在目标上,在指定的 Scheduler 上指定的延迟后安排要交付的事件.

  • Delay - Calls target operator immediately to get an IObservable, on its Subscribe calls Subscribe on target immediately, schedules events for delivery after specified delay on the specified Scheduler.

DelaySubscription -立即调用目标运算符以获取 IObservable .在其 Subscribe 上,在指定的 Scheduler 上指定延迟后,在目标上调度 Subscribe .

DelaySubscription - Calls target operator immediately to get an IObservable. On its Subscribe schedules Subscribe on target for execution after specified delay on the specified Scheduler.

Defer -没有目标运算符.在 Subscribe 上运行提供的工厂函数以获取目标 IObservable ,并立即调用 Subscribe .没有添加延迟,因此没有要指定的 Scheduler .

Defer - Has no target operator. On Subscribe runs provided factory function to get target IObservable and immediately calls Subscribe. There's no delay added, hence no Scheduler to specify.

这篇关于RX退回并重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆