为什么回调的OnError从给定用户投掷时从来不叫? [英] Why is the OnError callback never called when throwing from the given subscriber?

查看:100
本文介绍了为什么回调的OnError从给定用户投掷时从来不叫?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

请,请遵守以下单元测试:

 使用系统;
使用System.Reactive.Linq;
使用的System.Threading;
使用System.Threading.Tasks;
使用Microsoft.VisualStudio.TestTools.UnitTesting;命名空间单元测试
{
    [识别TestClass]
    公共类TestRx
    {
        公共const int的UNIT_TEST_TIMEOUT = 5000;        私有静态的IObservable< INT> GetObservable(INT数= 100,INT msWait = 10)
        {
            返回Observable.Create<&诠释GT;(异步(OBS,的CancellationToken)=>
            {
                的for(int i = 0; I<计数和放大器;&安培;!cancellationToken.IsCancellationRequested ++ I)
                {
                    int值=我;
                    obs.OnNext(的await Task.Factory.StartNew(()=>
                    {
                        Thread.sleep代码(msWait);
                        返回值;
                    }));
                }
            });
        }        [TestMethod的,TestCategory(CI),超时(UNIT_TEST_TIMEOUT)
        公共无效订阅()
        {
            VAR TCS =新TaskCompletionSource<对象>();
            INT I = 0;
            。GetObservable()订阅(N =>
            {
                Assert.AreEqual(I,N);
                ++我;
            },E => Assert.Fail()()=>
            {
                Assert.AreEqual(100,i)的;
                tcs.TrySetResult(NULL);
            });            tcs.Task.Wait();
        }        [TestMethod的,TestCategory(CI),超时(UNIT_TEST_TIMEOUT)
        公共无效SubscribeCancel()
        {
            VAR TCS =新TaskCompletionSource<对象>();
            VAR CTS =新CancellationTokenSource();
            INT I = 0;
            。GetObservable()订阅(N =>
            {
                Assert.AreEqual(I,N);
                ++我;
                如果(我== 5)
                {
                    cts.Cancel();
                }
            },E =>
            {
                Assert.IsTrue(ⅰ&小于100);
                tcs.TrySetResult(NULL);
            },()=>
            {
                Assert.IsTrue(ⅰ&小于100);
                tcs.TrySetResult(NULL);
            },cts.Token);            tcs.Task.Wait();
        }        [TestMethod的,TestCategory(CI),超时(UNIT_TEST_TIMEOUT)
        公共无效SubscribeThrow()
        {
            VAR TCS =新TaskCompletionSource<对象>();
            INT I = 0;
            。GetObservable()订阅(N =>
            {
                Assert.AreEqual(I,N);
                ++我;
                如果(我== 5)
                {
                    抛出新的异常(XO-XO);
                }
            },E =>
            {
                Assert.AreEqual(XO-XO,e.Message);
                tcs.TrySetResult(NULL);
            },Assert.Fail);            tcs.Task.Wait();
        }
    }
}

单元测试 SubscribeCancel SubscribeThrow 超时,因为的OnError 回调不会被调用,从而在任务等待永远不会结束。

什么是错的?

P.S。

此问题是关系到<一个href=\"http://stackoverflow.com/questions/23847424/how-to-wrap-sqldatareader-with-iobservable-properly\">How用正确的IObservable包裹SqlDataReader的?

修改

在此期间我已经创建了一个新的接收问题 - 的https://rx.$c $ cplex.com /工作项/ 74

还有<一个href=\"http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the-given-subscriber?forum=rx\" rel=\"nofollow\">http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the-given-subscriber?forum=rx

EDIT2

以下观察者执行产生完全相同的结果,即使它与段落所述的接收设计指南 - 订阅的实现不应该抛出

 私有静态的IObservable&LT; INT&GT; GetObservable(INT数= 100,INT msWait = 10)
{
    返回Observable.Create&LT;&诠释GT;(异步(OBS,的CancellationToken)=&GT;
    {
        尝试
        {
            的for(int i = 0; I&LT;计数和放大器;&安培;!cancellationToken.IsCancellationRequested ++ I)
            {
                int值=我;
                obs.OnNext(的await Task.Factory.StartNew(()=&GT;
                {
                    Thread.sleep代码(msWait);
                    返回值;
                }));
            }
            obs.OnCompleted();
        }
        赶上(例外EXC)
        {
            obs.OnError(EXC);
        }
    });
}

EDIT3

我开始相信,都应该写一个code这样在异步可观察序列整合到其它同步code(这通常是在服务器端的情况下在一个地方或其他):

  VAR TCS =新TaskCompletionSource&LT;对象&gt;();
。GetObservable()订阅(N =&GT;
{
  尝试
  {
    ...
  }
  赶上(例外五)
  {
    DoErrorLogic();
    tcs.TrySetException(E);
  }
},E =&GT;
{
  DoErrorLogic();
  tcs.TrySetException(E);
},()=&GT;
{
  DoCompletedLogic();
  tcs.TrySetResult(NULL);
});tcs.Task.Wait();

难道真是这样吗?

修改4

我觉得它终于开始涓滴我脑子生锈你想说。我将切换到我现在的其他职位 - <一个href=\"http://stackoverflow.com/questions/23847424/how-to-wrap-sqldatareader-with-iobservable-properly\">How用正确的IObservable包裹SqlDataReader的?


解决方案

这行为是设计使然。如果用户抛出一个异常(这是不好的做法的方式),则RX框架正确的原因,是死的,它没有进一步的通信。如果订阅被取消,这也不是一个错误 - 只是一个请求发送任何形式的进一步活动 - 其中接收荣誉

编辑回应评论

我不认为这是一个简单的参考在文档指向 - 您所看到的行为是其内在它的含蓄。我能得到最接近的是在源头code为<一个指向你href=\"http://rx.$c$cplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Core/Reactive/AnonymousSafeObserver.cs\"相对=nofollow> AnonymousSafeObserver 和<一个href=\"http://rx.$c$cplex.com/SourceControl/latest#Rx.NET/Source/System.Reactive.Core/Reactive/Internal/AutoDetachObserver.cs\"相对=nofollow> AutoDetatchObserver 。后者有一个解释性的场景,也许会有帮助,但它是一个有点棘手。

也许一个类比会有所帮助。试想一下,在数据流事件是一个报摊被交付报纸。和订阅者的家庭。

用户抛出异常

在报摊愉快地提供报纸,直到有一天,用户之一 - 琼斯先生 - 离开他的气就和他家发生爆炸杀死琼斯先生和毁坏的房子(抛出未处理的异常)。该报摊意识到他再也不能送报纸琼斯先生和他都不可以发送终止通知,并没有与报纸供应没有问题(这样的OnError或OnCompleted是不恰当的)和报刊亭进行少了一个用户。

与报纸印刷不小心使用了易燃的墨水,在火焰发送出厂前对比这一点。现在穷人报摊的确应该送一份解释性说明(的OnError)来的所有的它的订户供应无限期停止。

用户取消订阅

先生。琼斯从他收到订阅的报纸,直到有一天,他决定他生病去pressing故事奔流不息的,并要求取消他订阅。该报摊责成。他不发琼斯先生一个说明,解释报纸已停止印刷版(无OnCompleted) - 他们没有。他既不发送琼斯先生一个说明,解释的报纸已经停业(无的OnError)的 - 他只是停止送报纸,琼斯先生要求

响应EDIT3

我很同情你的斗争。我注意到在您的code你一直在试图网与接收的TPL(任务)成语。这些尝试往往感到笨拙,因为他们真的是相当不同的世界。这是相当困难在这样的一段评论:


  

    

我开始相信,都应该写一个code这样在异步可观察序列整合到其它同步code(这通常是在服务器端的情况下在一个地方或其他):


  

与布兰登的制作精良的断言强有力的协议,我想不出情况下它真的适合于在服务器端异步code与同步code集成的这样你尝试。这感觉就像一个设计的气味给我。惯用,一会尽量保持在code反应 - 使订阅,并让用户处理工作被动。我不记得跨越必需品来过渡到同步code你描述的方式。

当然,看着你在EDIT3写的code,目前还不清楚你想达到什么目的。这不是的的的作出反应中的错误的用户的责任。这是尾巴摇狗。这需要在那里,以保证用户的服务连续性的异常处理程序应该在认购手续code,而不是源可观察到的 - 但只应免受流氓观察者的行为本身的关注。这样的逻辑在上文中AnonymousSafeObserver实施,所使用的大多数的接收提供的运营商。可观察到的很可能有逻辑来处理的连续性其的<​​em>源的数据 - 但是这是一个不同的关注,而不是一个你在你的code正在处理

无论你正试图弥合通过电话同步code到 ToTask 等待有可能是原因仔细考虑您的设计。

我觉得提供了一个更具体的问题陈述 - 也许从正在试图解决一个真实的场景绘制 - 将有助于你更引发有益的建议。在'SqlDataReader`例子,你说......


  

    

最后,人们可能会用观察的订阅它[包装一个SqlDataReader]直接,但他们将不得不等待最终在某一时刻(阻塞线程),因为大多数code左右仍然是同步的


  

...突出了设计的泥潭,你是在这种情况下,你推断出这样的消费者显然会更好使用的IEnumerable&LT; T&GT; 接口 - 或者要求一个的IObservable&LT;名单,LT; T&GT;&GT; 。但关键是看的大局观,你试图在所有可观察包装的来包装一个SqlDataReader事实的是一个设计的味道 - 因为这是一个固定的数据的提供响应一个特定时间请求。这可能是一个异步的情况 - 但不是一个真正的反应之一。一个更典型的对比度反应的类似情景给我的价格为股票的十当你让他们你在哪里为用户设置数据的未来现金流完全由源的遗志随即反应过来。

Please, observe the following unit test:

using System;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace UnitTests
{
    [TestClass]
    public class TestRx
    {
        public const int UNIT_TEST_TIMEOUT = 5000;

        private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
        {
            return Observable.Create<int>(async (obs, cancellationToken) =>
            {
                for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
                {
                    int value = i;
                    obs.OnNext(await Task.Factory.StartNew(() =>
                    {
                        Thread.Sleep(msWait);
                        return value;
                    }));
                }
            });
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void Subscribe()
        {
            var tcs = new TaskCompletionSource<object>();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
            }, e => Assert.Fail(), () =>
            {
                Assert.AreEqual(100, i);
                tcs.TrySetResult(null);
            });

            tcs.Task.Wait();
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void SubscribeCancel()
        {
            var tcs = new TaskCompletionSource<object>();
            var cts = new CancellationTokenSource();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
                if (i == 5)
                {
                    cts.Cancel();
                }
            }, e =>
            {
                Assert.IsTrue(i < 100);
                tcs.TrySetResult(null);
            }, () =>
            {
                Assert.IsTrue(i < 100);
                tcs.TrySetResult(null);
            }, cts.Token);

            tcs.Task.Wait();
        }

        [TestMethod, TestCategory("CI"), Timeout(UNIT_TEST_TIMEOUT)]
        public void SubscribeThrow()
        {
            var tcs = new TaskCompletionSource<object>();
            int i = 0;
            GetObservable().Subscribe(n =>
            {
                Assert.AreEqual(i, n);
                ++i;
                if (i == 5)
                {
                    throw new Exception("xo-xo");
                }
            }, e =>
            {
                Assert.AreEqual("xo-xo", e.Message);
                tcs.TrySetResult(null);
            }, Assert.Fail);

            tcs.Task.Wait();
        }
    }
}

The unit tests SubscribeCancel and SubscribeThrow time out, because the OnError callback is never called and thus the waiting on the task never ends.

What is wrong?

P.S.

This question is related to How to wrap SqlDataReader with IObservable properly?

EDIT

In the meantime I have created a new Rx issue - https://rx.codeplex.com/workitem/74

Also http://social.msdn.microsoft.com/Forums/en-US/5d0a4808-3ee0-4ff0-ab11-8cd36460cd66/why-is-the-onerror-callback-never-called-when-throwing-from-the-given-subscriber?forum=rx

EDIT2

The following observer implementation produces exactly the same result, even though it complies with the paragraph 6.5 of the Rx Design Guidelines - "Subscribe implementations should not throw":

private static IObservable<int> GetObservable(int count = 100, int msWait = 10)
{
    return Observable.Create<int>(async (obs, cancellationToken) =>
    {
        try
        {
            for (int i = 0; i < count && !cancellationToken.IsCancellationRequested; ++i)
            {
                int value = i;
                obs.OnNext(await Task.Factory.StartNew(() =>
                {
                    Thread.Sleep(msWait);
                    return value;
                }));
            }
            obs.OnCompleted();
        }
        catch (Exception exc)
        {
            obs.OnError(exc);
        }
    });
}

EDIT3

I am starting to believe that one is supposed to write a code like this when an asynchronous observable sequence is integrated into an otherwise synchronous code (which would usually be the case on a server side in one place or another):

var tcs = new TaskCompletionSource<object>();
GetObservable().Subscribe(n =>
{
  try
  {
    ...
  }
  catch (Exception e)
  {
    DoErrorLogic();
    tcs.TrySetException(e);
  }
}, e =>
{
  DoErrorLogic();
  tcs.TrySetException(e);
}, () => 
{
  DoCompletedLogic();
  tcs.TrySetResult(null);
});

tcs.Task.Wait();

Is it really so?

EDIT 4

I think it finally starts trickling down my rusty brain what you are trying to say. I will switch to my other post now - How to wrap SqlDataReader with IObservable properly?

解决方案

This behaviour is by design. If the subscriber throws an exception (which is bad practice by the way), the Rx framework correctly reasons it is dead and communicates with it no further. If a subscription is cancelled, this is also not an error - merely a request to send no further events of any kind - which Rx honours.

Edit in response to comments

I don't think there is an easy reference to point at in documentation - the behaviour you are seeing is so intrinsic it's implicit. The closest I can get is to point you at the source code for AnonymousSafeObserver and AutoDetatchObserver. The latter has an explanatory scenario that might help, but it's a little involved.

Perhaps an analogy would help. Imagine the data stream events are newspapers being delivered by a newsagent. and the subscribers are households.

Subscriber throws an exception

The newsagent happily delivers newspapers until one day, one of the subscribers - a Mr. Jones - leaves his gas on and his house explodes killing Mr. Jones and destroying the house (throw unhandled exception). The newsagent realises that he can no longer deliver newspapers to Mr. Jones and neither can he send a termination notice and there is no problem with the newspaper supply (so OnError or OnCompleted is not appropriate) and the newsagent carries on with one less subscriber.

Contrast this with the newspaper printers inadvertently using inflammable ink and sending the factory up in flames. Now the poor newsagent must indeed send an explanatory note (OnError) to all it's subscribers that supply has stopped indefinitely.

Subscriber cancels subscription

Mr. Jones is receiving newspapers from his subscription, until one day he decides he is sick of the endless torrent of depressing stories and asks to cancel his subscription. The newsagent obliges. He does not send Mr. Jones a note explaining the newspaper has stopping printing editions (no OnCompleted) - they haven't. Neither does he send Mr. Jones a note explaining the newspaper has gone out of business (no OnError) - he just stops delivering newspapers, as Mr. Jones requested.

Response to Edit3

I sympathise with your struggle. I note throughout your code you have been trying to mesh the TPL (Task) idiom with that of Rx. Such attempts often feel clumsy because they are really quite different worlds. It's quite hard to comment on a paragraph like this:

I am starting to believe that one is supposed to write a code like this when an asynchronous observable sequence is integrated into an otherwise synchronous code (which would usually be the case on a server side in one place or another):

In strong agreement with Brandon's well-made assertion, I can't think of instances where it's really appropriate to integrate asynchronous code with synchronous code on the server side in the way you are attempting. This feels like a design smell to me. Idiomatically, one would try to keep the code reactive - make the subscription, and let the subscriber handle work reactively. I can't recall coming across a necessity to transition into synchronous code the way you describe.

Certainly, looking at the code you wrote in Edit3, it's not clear what you are trying to achieve. It's not the responsibility of the source to react to errors in a subscriber. This is the tail wagging the dog. The exception handlers that need to be there to ensure continuity of service of a subscriber should be in the subscription handling code, not in the source observable - it should only concern itself with protection from rogue observer behaviour. Such logic is implemented in the AnonymousSafeObserver linked above and is used by most of the Rx supplied operators. The observable may very well have logic to handle continuity of its source data - but that is a different concern, and not one you are addressing in your code.

Wherever you are attempting to bridge to synchronous code via calls to ToTask or Wait there is probably a cause to consider your design carefully.

I feel that providing a more concrete problem statement - perhaps drawn from a real world scenario you are trying to solve - would serve to elicit more helpful advice for you. The 'SqlDataReader` example where you say...

Finally people might use the observable [wrapping a SqlDataReader] directly by subscribing to it, but they would have to wait for the end (blocking the thread) at some point, since most of the code around is still synchronous.

... highlights the design quagmire you are in. In this case as you infer such consumers would clearly be better off using an IEnumerable<T> interface - or perhaps asking for an IObservable<List<T>>. But the key is to look at the bigger picture, the fact you are trying to wrap a SqlDataReader in an observable wrapper at all is a design smell - because this is a of supply of fixed data in response to a specific one-time request. This is possibly an asynchronous scenario - but not really a reactive one. Contrast with a more typically reactive scenario like "send me prices for stock X whenever you get them" where you are setting up a future flow of data entirely at the behest of the source for subscribers to then react.

这篇关于为什么回调的OnError从给定用户投掷时从来不叫?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆