如何实现更好的最终处方运算符? [英] How to implement a "better" Finally Rx operator?

查看:43
本文介绍了如何实现更好的最终处方运算符?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近我意识到RxFinally运算符的行为方式至少对我来说是意想不到的。我的预期是,finallyAction抛出的任何错误都将传播到操作员的观察者下游。遗憾的是,情况并非如此。实际上,操作符首先将Antecedent序列的完成(或失败)传播给它的观察者,然后然后在无法传播该操作抛出的潜在错误的时间点调用action。因此,它在ThreadPool上抛出错误,并使进程崩溃。这不仅出乎意料,而且非常有问题。以下是此行为的最小演示:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Finally(() => throw new ApplicationException("Oops!"))
    .Subscribe(_ => { }, ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(1000);

结果:未处理的异常(Fiddle)

Finallylambda引发的异常不会由SubscribeonError处理程序处理,因为它是理想的。

这个特性(我想称之为一个缺陷)严重限制了Finally运算符在我看来的用处。从本质上讲,只有当我想要调用一个预计永远不会失败的操作时,我才能使用它,如果它失败了,它将指示应用程序状态的灾难性损坏,而此时无法恢复。例如,我可以使用它来ReleaseaSemaphoreSlim(例如,就像我所做的here),只有当我的代码有错误时,它才会失败。在这种情况下,我可以接受我的应用程序崩溃。但我也使用它recently调用调用者提供的未知操作,该操作可能会失败,在这种情况下使应用程序崩溃是不可接受的。相反,错误应该向下游传播。所以我在这里问的是如何实现一个具有相同签名的Finally变体(让我们称之为FinallySafe),以及下面指定的行为:

public static IObservable<TSource> FinallySafe<TSource>(
    this IObservable<TSource> source, Action finallyAction);
  1. finallyAction应在source序列发出OnCompletedOnError通知之后调用,但此通知传播到观察者之前调用。
  2. 如果finallyAction调用成功,则应将原始的OnCompleted/OnError通知传播给观察者。
  3. 如果finallyAction调用失败,则应该向观察者传播OnError通知,其中包含刚刚发生的错误。在这种情况下,应忽略(不传播)前一个错误,即可能导致source失败的错误。
  4. source完成之前取消订阅FinallySafe时,也应该调用finallyAction。当订阅者(观察者)处置订阅时,finallyAction应该被同步调用,任何错误都应该传播给Dispose方法的调用者。
  5. 如果FinallySafe被多个观察者订阅,则每个订阅应独立调用一次finallyAction,遵循上述规则。并发调用正常。
  6. 每个订阅服务器调用finallyAction永远不应超过一次。

验证:将上述代码片段中的Finally替换为FinallySafe应会导致程序不会因未处理的异常而崩溃。

替代:我也愿意接受一个合理的答案,解释为什么内置Finally运算符的行为优于上面指定的自定义FinallySafe运算符的行为。

推荐答案

以下是FinallySafe运算符的实现,具有问题中指定的行为:

/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
    Action finallyAction)
{
    return Observable.Create<T>(observer =>
    {
        var finallyOnce = Disposable.Create(finallyAction);
        var subscription = source.Subscribe(observer.OnNext, error =>
        {
            try { finallyOnce.Dispose(); }
            catch (Exception ex) { observer.OnError(ex); return; }
            observer.OnError(error);
        }, () =>
        {
            try { finallyOnce.Dispose(); }
            catch (Exception ex) { observer.OnError(ex); return; }
            observer.OnCompleted();
        });
        return new CompositeDisposable(subscription, finallyOnce);
    });
}
finallyAction被分配为Disposable.Create一次性实例的Dispose操作,以确保该操作最多被调用一次。然后,使用CompositeDisposable实例将此一次性订阅与source的一次性订阅组合在一起。

作为附注,我想解决一个问题,如果我们可以更进一步,并在取消订阅期间向下传播finallyAction可能的错误。在某些情况下,这可能是可取的,但不幸的是,这是不可能的。首先,也是最重要的是,这样做违反了The Observable Contract文件中的一项准则,即:

当观察者向可观察对象发出取消订阅通知时,该可观察对象将尝试停止向观察者发出通知。但是,不能保证观察者在向观察者发出取消订阅通知后不会向观察者发出通知。

因此,这样的实现将是不一致的。更糟糕的是,Observable.Create方法通过在处理订阅后立即将observer设置为静音来实施这一指导原则。它通过将观察者封装在AutoDetachObserver包装器中来实现这一点。即使我们试图通过从头实现IObservable<T>类型来绕过这一限制,任何可以附加在不一致的Finally运算符之后的内置运算符都将使取消订阅后的OnError通知静音。所以这是不可能的。无法将取消订阅过程中的错误传播到刚刚请求取消订阅的订阅服务器。

这篇关于如何实现更好的最终处方运算符?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆