使用 Reactive Extensions 测试 Observables 时如何为 Task.Run 使用虚拟时间 [英] How to use virtual time for Task.Run when testing Observables with Reactive Extensions

查看:21
本文介绍了使用 Reactive Extensions 测试 Observables 时如何为 Task.Run 使用虚拟时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想测试以下功能

/// Items are processed asynchronously via fn as they arrive. However
/// if an item arrives before the last asynchronous operation has
/// completed then the cancellation token passed to fn will be
/// triggered enabling the task to be canceled in a best effort
/// way.
public static IObservable<U> SelectWithCancellation<T, U>
    ( this IObservable<T> This
    , Func<CancellationToken, T, Task<U>> fn 
    )
{
    return This
        .Select(v=>Observable.FromAsync(token=>fn(token, v)))
        .Switch();
}

我想测试它,并且是我能想到的最好的有效的是下面.首先我创建一个长时间运行的任务可以取消

I wish to test it and the best I have been able to come up with that works is below. First I create a long running task that can be canceled

public Task<string> JobTask
    ( CancellationToken token
    , string input
    )
{
    return Task.Factory.StartNew(() =>
        {
            if ( input == "C" || input == "E" )
            {
                while ( !token.IsCancellationRequested ) ;
            }
            return input;
        }
    );
}

然后我测试它真的有效

public class SelectWithCancelationSpec : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();

    [Fact]
    public void ShouldWork()
    {
        var o = _Scheduler.CreateHotObservable
            ( OnNext(100, "A")
            , OnNext(200, "B")
            , OnNext(300, "C")
            , OnNext(400, "D")
            , OnNext(500, "E")
            , OnNext(500, "F")
            );

        List<string> actual = new List<string>();

        o
            .SelectWithCancellation(JobTask)
            .Subscribe(v => actual.Add(v));

        var delay = 100;
        _Scheduler.AdvanceTo(150);
        Thread.Sleep(delay);
        _Scheduler.AdvanceTo(250);
        Thread.Sleep(delay);
        _Scheduler.AdvanceTo(350);
        Thread.Sleep(delay);
        _Scheduler.AdvanceTo(450);
        Thread.Sleep(delay);
        _Scheduler.AdvanceTo(550);
        Thread.Sleep(delay);
        _Scheduler.AdvanceTo(650);


        var expected = new[] { "A", "B", "D", "F" };

        actual
            .ShouldBeEquivalentTo(expected);

    }
}

问题是我不得不将 real time 引入到测试.这是因为我模拟的 JobTask 运行在一个真实的线程脱离线程池并且不尊重虚拟时间测试调度程序.如果我不拖延会发生什么在 AdvanceTo 调用之间,我丢弃的消息比我期望在测试中,因为 JobTask 需要很长时间来处理.

The problem is that I have had to introduce real time into the test. This is because my simulated JobTask is running on a real thread off the thread pool and doesn't respect the virtual time of the test scheduler. What happens is if I don't put the delays in between the AdvanceTo calls is I drop more messages than I expect in the test because the JobTask takes too long to process.

问题是.我如何创建一个尊重虚拟时间并允许我测试我是否可以成功下降预期的消息.

The question is. How can I create a JobTask that respects the virtual time and allows me to test if I can successfully drop the intended messages.

推荐答案

关键是创建一个 TestScheduler 知道的滴答事件流.为此,我创建了一个扩展方法

The key is to create a stream of tick events that the TestScheduler knows about. For that purpose I created an extension method

public static class TestSchedulerExtensions
{
    public static IObservable<Unit> CreateTickObserver(this TestScheduler s, int startTick, int endTick, int tickStep)
    {
        var ticks = Enumerable.Repeat(1, Int32.MaxValue)
            .Select(( v, i ) => i * tickStep + startTick)
            .TakeWhile(v => v <= endTick)
            .Select(tick => ReactiveTest.OnNext(tick, Unit.Default));

        return s.CreateColdObservable(ticks.ToArray());

    }  
}

然后是另一种扩展方法来帮助在测试条件下创建任务

and then another extensions method to assist in creating Tasks under test conditions

    public static Func<CancellationToken,U,Task<T>>
        AsyncSelectorFactory<T, U>
        ( this TestScheduler s
        , int duration
        , int interval
        , Func<CancellationToken, U, IObservable<Unit>, Task<T>> fn 
        )
    {
        var ticker = s.CreateTickObserver(0, duration, interval);
        return ( c, u ) =>
        {
            return fn(c, u, ticker);
        };
    }

TaskFactory 生成的函数可以生成任务,但在测试调度程序的控制下通过一个自动收报机.那自动收报机可用于造成延误或其他事情.

The TaskFactory generates functions that can generate tasks but which are passed a ticker under the control of the test scheduler. That ticker can be used to cause delays or other things.

请注意,我们在 _Ticker 来源的 observable 上等待以创建延迟在任务中.现在我们的测试用例看起来像

Note above that we await on the _Ticker sourced observable to create the delay in the task. And now our test case looks like

现在测试很简单

public class SelectWithCancelationSpec : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();

    [Fact]
    public void ShouldWork()
    {

        var o = _Scheduler.CreateColdObservable
            ( OnNext(100, "A")
            , OnNext(200, "B")
            , OnNext(300, "C")
            , OnNext(400, "D")
            , OnNext(500, "E")
            , OnNext(600, "F")
            );

        int cancelCount = 0;
        var job = _Scheduler.AsyncSelectorFactory<string,string>
            ( 1000
            , 10
            , async ( token, input, ticker ) => { 
                if ( input == "C" || input == "E" )
                {
                    await ticker.TakeWhile(v => !token.IsCancellationRequested);
                    cancelCount++;
                }
                return input;
            });


        var actual = _Scheduler.Start(() =>
        {
            return o.SelectWithCancellation(job);
        }
        , created: 0
        , subscribed: 1
        , disposed: 1000
        );

        var expected = new[] { "A", "B", "D", "F" };

        cancelCount.Should().Be(2);


        actual.Messages.Select(v=>v.Value.Value)
            .ShouldBeEquivalentTo(expected);

    }



}

这篇关于使用 Reactive Extensions 测试 Observables 时如何为 Task.Run 使用虚拟时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆