如何使用ObserveOn时处理OnNext异常? [英] How to handle exceptions in OnNext when using ObserveOn?
问题描述
我的应用程序终止时的错误在 OnNext
按当我使用一个观察者抛出 ObserveOn(Scheduler.ThreadPool)
。我已经找到了对付这个问题的唯一方法是使用下面的自定义扩展方法(除了确保OnNext从来没有抛出异常)。然后确保每个 ObserveOn
后跟一个 ExceptionToError
。
公共静态的IObservable< T> ExceptionToError< T>(此的IObservable< T>源){
无功分=新的受试对象; T>();
source.Subscribe(I => {
尝试{
sub.OnNext(我);
}赶上(例外错误){
sub.OnError(ERR );
}
}
,E => sub.OnError(五),()=> sub.OnCompleted());
返回子;
}
不过,这感觉不对。有没有更好的方式来处理这件事?
示例
这程序由于未捕获的异常的崩溃。
类节目{
静态无效的主要(字串[] args){
尝试{
变种XS =新的受试对象; INT>();
xs.ObserveOn(Scheduler.ThreadPool).Subscribe(X => {
Console.WriteLine(X);
如果(X%5 == 0){$ (砰!)b $ b抛出新的System.Exception;
}
},EX => Console.WriteLine(中招:+ ex.Message)); //&下; - 未达到
xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);
}赶上(例外五){
Console.WriteLine(中招:+ e.Message); //< - 也没有达到
} {最后
Console.ReadKey();
}
}
}
我们正在解决接收V2.0这个问题,首先是RC版本。你可以阅读所有关于它在我们的 http://blogs.msdn.com/rxteam 博客。它基本上可以归结为更有纪律的错误在管道本身,具有SubscribeSafe扩展方法(订阅时重定向错误而进入的OnError频道)合并处理,并在IScheduler(一个两难扩展方法来包装与异常处理逻辑围绕计划调度操作)。
关于这里提出的ExceptionToError方法,它有一个缺陷。该IDisposable的认购对象仍然可以为空当回调运行;有一个基本的竞争条件。要解决这个问题,你必须使用一个SingleAssignmentDisposable。
My application terminates when an error is thrown in OnNext
by an observer when I use ObserveOn(Scheduler.ThreadPool)
. The only way I have found to deal with this is by using a custom extension method below (apart from making sure OnNext never throws an exception). And then making sure that each ObserveOn
is followed by an ExceptionToError
.
public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
var sub = new Subject<T>();
source.Subscribe(i => {
try {
sub.OnNext(i);
} catch (Exception err) {
sub.OnError(err);
}
}
, e => sub.OnError(e), () => sub.OnCompleted());
return sub;
}
However, this does not feel right. Is there a better way to deal with this?
Example
This program crashes because of uncaught exception.
class Program {
static void Main(string[] args) {
try {
var xs = new Subject<int>();
xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
Console.WriteLine(x);
if (x % 5 == 0) {
throw new System.Exception("Bang!");
}
}, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached
xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);
} catch (Exception e) {
Console.WriteLine("Caught : " + e.Message); // <- also not reached
} finally {
Console.ReadKey();
}
}
}
We're addressing this issue in Rx v2.0, starting with the RC release. You can read all about it on our blog at http://blogs.msdn.com/rxteam. It basically boils down to more disciplined error handling in the pipeline itself, combined with a SubscribeSafe extension method (to redirect errors during subscription into the OnError channel), and a Catch extension method on IScheduler (to wrap a scheduler with exception handling logic around scheduled actions).
Concerning the ExceptionToError method proposed here, it has one flaw. The IDisposable subscription object can still be null when the callbacks run; there's a fundamental race condition. To work around this, you'd have to use a SingleAssignmentDisposable.
这篇关于如何使用ObserveOn时处理OnNext异常?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!