RX-重新抛出包含方法中的错误 [英] RX - rethrow an error in containing method

查看:35
本文介绍了RX-重新抛出包含方法中的错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将RX流( IObservable )中的错误转换为包含对该流订阅的方法中的异常

I need to translate an error in an RX stream (IObservable) into an exception in the method that contains the subscription to the stream

(由于此问题 https://github.com/aspnet/SignalR/pull/1331 ,因此错误不会序列化到客户端.)解决此问题后,我将恢复为正确处理错误

(because of this issue https://github.com/aspnet/SignalR/pull/1331 , Whereby errors arent serialised to clients.) Once this issue is fixed I will revert to handling error properly

例如

我有以下方法

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    return _mySvc.ThingChanged();
}

所以我尝试订阅该流并重新抛出该错误,但是它仍然没有传输到客户端:

So I have tried to subscribe to the stream and rethrow the error, but it still doesnt get transmitted to the client:

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    _mySvc.ThingChanged().Subscribe(item => {}, OnError, () => {});
    return _mySvc.ThingChanged();
}

private void OnError(Exception exception)
{
    throw new Exception(exception.Message);
}

我需要的是等价于LiveStream方法

What I need is the equivelent of throwing in the LiveStream method

例如此错误已传播给客户端

e.g. this error is propogated to the client

public IObservable<StreamItem> LiveStream()
{
    _mySvc.Start();
    throw new Exception("some error message");
    return _mySvc.ThingChanged();
}

任何想法如何实现?

推荐答案

我也发现了这一点,尤其是在包含"反应性管道的情况下,即具有明确定义的开始和结束的管道.在这种情况下,仅允许基础异常冒泡到包含作用域就足够了.但是,正如您所发现的,该概念通常对于Rx而言是陌生的:管道中发生的事情仍留在管道中.

I have found this as well, especially with a "contained" reactive pipeline—that is, one with a well-defined beginning and end. In situations like those, it may suffice to simply allow underlying exceptions to bubble up to the containing scope. But as you have found, that concept is rather foreign to Rx generally: what happens in the pipeline stays in the pipeline.

我在一个包含的场景中发现的唯一方法是使用 Catch()将错误滑出"流,然后递回空的 IObservable 以允许流自然停止(否则,如果您正在等待 IObservable 完成操作,则会挂起).

The only way out of this that I have found in a contained scenario is to "slip" the error out of the stream using Catch(), and hand back an empty IObservable to allow the stream to halt naturally (otherwise, you'll hang if you're awaiting an IObservable for completion).

LiveStream()方法内 中这将不起作用,因为在使用流之前,该上下文/作用域应该早已消失.因此,这必须在包含整个管道的上下文中发生.

This will not work within your LiveStream() method, because that context/scope should have passed out of existence long before you're consuming your stream. So, this will have to happen in the context that contains the whole pipeline.

Exception error = null;
var source = LiveStream()
  .Catch<WhatYoureStreaming, Exception>(ex => {error = ex; return Observable.Empty<WhatYoureStreaming>(); })
  ...

await source; // if this is how you're awaiting completion

// not a real exception type, use your own
if (error != null) throw new ContainingException("oops", error);

仅在最后不要抛出错误,否则您将丢失原始的堆栈跟踪.

Just don't throw error there at the end, you'll lose the original stack trace.

这篇关于RX-重新抛出包含方法中的错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆