等待 IObservable 获取所有元素错误 [英] waiting IObservable to get all elements error

查看:34
本文介绍了等待 IObservable 获取所有元素错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这门课:

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         var subject = new Subject<int>();
         Task.Factory.StartNew(() =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       subject.OnNext(i);
                                   }
                                   subject.OnCompleted();

                               });
         return subject;
     }
}

我也为此编写了一个测试方法:

I wrote a test method for this as well:

[TestMethod]
public void TestServiceTest1()
{
   var testService = new TestService();
   var i = 0;
   var observable = testService.GetObservable(3);
   observable.Subscribe(_ =>
   {
      i++;
   });          
   observable.Wait();
   Assert.AreEqual(i, 3);
}

但有时我会收到错误:Sequence contains no elements in method Wait().

But sometimes I get the error: Sequence contains no elements in method Wait().

我建议在测试到达 observable.Wait() 行之前完成我的 IObservable.我怎样才能避免这个错误?

I propose that my IObservable is completed before test reaches the observable.Wait() line. How can I avoid this error?

推荐答案

在我看来,这段代码的基本问题是 IObservable 表示如何观察某事的契约.

It seems to me that the basic problem in this code is that an IObservable represents a contract of how to observe something.

在这种情况下,GetObservable 方法不仅返回合同,而且立即执行工作(使用 TPL).如果您认为您可以多次订阅同一个 IObservable 实例(这实际上发生在示例代码中,因为您是第一次使用 Subscribe 和另一个带有 Wait).这个区别是我学习 Rx 的最大绊脚石.

In this case, the GetObservable method is not just returning a contract, it is performing work (with TPL) right away. This does not make sense if you consider that you can subscribe multiple times to the same IObservable instance (which is in fact happening in the sample code, as you're subscribing a first time with Subscribe and another with Wait). This single distinction was the biggest stumbling block for me in learning Rx.

我会改为实现这样的方法(并且完全避免使用 Subject<>,因为它不是创建 Observable 的首选方式):

I would instead implement the method like this (and avoid altogether using Subject<>, since it is not the preferred way of creating an Observable):

public class TestService
{
     public IObservable<int> GetObservable(int max)
     {
         return Observable.Create<int>((IObserver<int> observer) =>
                               {
                                   for (int i = 0; i < max; i++)
                                   {
                                       observer.OnNext(i);
                                   }
                                   observer.OnCompleted();
                               });
     }
}

还有对 Observable.Create 的有趣覆盖,提供了更大的灵活性.

There are also interesting overrides to Observable.Create which provide more flexibility.

这篇关于等待 IObservable 获取所有元素错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆