发布可观察对象时异步创建挂起 [英] Async Create hanging while publishing observable

查看:45
本文介绍了发布可观察对象时异步创建挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下代码:

var xs = Observable.Create<Unit>(async o => 
{
    await Task.Delay(10);
    o.OnError(new Exception());
}).Replay().RefCount();

xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
await xs.DefaultIfEmpty();

上面的序列不会抛出任何异常并且永远不会完成.

The sequence above doesn't throw any exceptions and never completes.

我做了以下观察:

  1. 删除第一个订阅会启用错误传播 - 在 Subscribe 上下文(最后一行)中抛出异常
  2. 删除 .Replay().RefCount() 启用错误传播 - 在 Subscribe 上下文中抛出异常(最后一行)
  3. 删除 await Task.Delay(10) 会启用错误传播 - 在 OnError 调用中(在 Create 方法内)抛出异常.令人惊讶的是,切换两个 Subscribe 方法会在 Subscribe 上下文(最后一行)抛出异常.
  1. Removing the first subscription enables error propagation - exception is thrown in Subscribe context (last line)
  2. Removing .Replay().RefCount() enables error propagation - exception is thrown in Subscribe context (last line)
  3. Removing await Task.Delay(10) enables error propagation - exception is thrown in OnError call (within Create method). Surprisingly, switching two Subscribe methods makes exception thrown at Subscribe context (last line).

话虽如此,我是问以下问题是否是故意的:

That being said, I am asking whether the following issues are by design:

  1. 上述场景中的可观察序列从未完成
  2. 有时会在 Create 方法中抛出异常,而其他时候 - 在 Subscribe 上下文中.
  1. Observable sequence in the above scenario never being completed
  2. The fact that exception is sometimes thrown inside Create method, and other times - at Subscribe context.

如果这是设计使然,您会推荐什么解决方法?在这种情况下,如何发布我的序列,以便我的所有客户端(观察者)可以安全地处理异常?当前的行为似乎很随意,尤其是对于库创建者.这也让调试变得非常痛苦.请指教.

If this is by design, what would you recommend as a workaround? How do I publish my sequences so that all of my clients (observers) can safely handle exceptions in this case? Current behavior seems so arbitrary, especially for library creators. It also makes debugging very painful. Please advise.

推荐答案

先回答你的问题:

  1. 不,这不是故意的.这是一个错误.
  2. 解决方法"是不要混合使用 TPL 和 Reactive.你可以打出这样有趣的事情.

以下按预期工作:

var xs = Observable.Throw<Unit>(new Exception())
    .Delay(TimeSpan.FromMilliseconds(10))
    .Replay()
    .RefCount();

这会导致在第一个 .Subscribeawait xs.DefaultIfEmpty() 调用时引发异常.由于延迟,您会收到两个异常: 多个线程正在运行.

This causes exceptions to be raised on the first .Subscribe and the await xs.DefaultIfEmpty() call. You get two exceptions because of the delay: Multiple threads are running.

至于为什么会发生这种情况,这是一个开始:

As for why this is happening, here's a start:

第一个订阅代码基本上翻译如下.(见源):

That first Subscribe code basically translates to the follow. (See source):

xs.Subscribe(x => Console.WriteLine(x), Stubs.Throw, Stubs.Nop);

public static class Stubs
{

    public static readonly Action Nop = delegate
    {
    };

    public static readonly Action<Exception> Throw = delegate (Exception ex)
    {
        var edi = ExceptionDispatchInfo.Capture(ex);
        edi.Throw();
    };
}

如果您在 Stubs 类中设置断点,您会看到它进入那里并尝试抛出异常.然而,异常不会冒泡,很可能是由于一些奇怪的 TPL/ReplaySubject 交互.

If you breakpoint into the Stubs class, you'll see that it gets in there and tries to throw the exception. However, the exception doesn't bubble up, most likely due to some weird TPL/ReplaySubject interaction.

这篇关于发布可观察对象时异步创建挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆