通过Task函数可实现无限IObservable以及可通过参数进行切换 [英] Infinite IObservable from Task function and toggle observable with parameters

查看:152
本文介绍了通过Task函数可实现无限IObservable以及可通过参数进行切换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是针对以下问题的后续问题:通过Task函数和切换可观察的

This is follow up question to : Infinite IObservable from Task function and toggle observable

上面的问题询问是否可以从中创建重复的 IObservable< TResult> IObservable< bool>切换 Task< TResult>查询,因此如果最后一个切换 true 而不是在处重复调用该查询如果最后切换全部为 false 。使用 Defer Switch 方法似乎很容易实现。

Above question asks if it is possible to create a repeating IObservable<TResult> from IObservable<bool> toggle and Task<TResult> query, so that query is called repeatedly if last toggle was true and not called at all if last toggle is false. That seems to be pretty easily achieved using Defer and Switch methods.

但是这有问题,因为查询没有参数化。具体来说, query 函数有两种类型的参数(使签名 Func< int,TParam,IQueryable< TResult>> )。每次调用该方法时,第一个参数都会增加。第二个参数是来自另一个 IObservable< TParam>的最新值。参数

But that has problem, because the query is not parametrized. Concretely, there are two types of parameter to the query function (making the signature Func<int, TParam, IQueryable<TResult>>). First parameter is incremented every time the method is called. Second parameter is latest value from another IObservable<TParam> params.

再次,我希望能够自动测试此设置。

Again, I want to be able to test this setup automatically.

这是方法树桩:

public static IObservable<TResult> Function<TParam, TResult>(IObservable<bool> toggle, IObservable<TParam> param, Func<int, TParam, IObservable<TResult>> query)
{
    param.Subscribe(a => { }); // dummy to make debug output

    return toggle
        .Select(b => b
            ? Observable
                .Defer(() => query(0, default(TParam))) // dummy parameters for debugging
                .Repeat()
            : Observable
                .Never<TResult>())
        .Switch();
}

并测试应通过的测试:

[Test]
public void Test_Function()
{
    var scheduler = new TestScheduler();

    var toggle = scheduler.CreateHotObservable(
        OnNext(10, false),
        OnNext(11, true),
        OnNext(18, false),
        OnNext(30, true),
        OnNext(45, false),
        OnNext(100, false)
        ).Do(x => Console.WriteLine(scheduler.Clock + " toggle " + x));

    var prms = scheduler.CreateHotObservable(
        OnNext(10, "a"),
        OnNext(29, "b"),
        OnNext(39, "c")
        ).Do(x => Console.WriteLine(scheduler.Clock + " param " + x));

    var resultObs =
        Function(toggle, prms, (p1, p2) => scheduler.CreateColdObservable(OnNext(2, p1 + " " + p2), OnCompleted<string>(2)))
            .Do(x => Console.WriteLine(scheduler.Clock + " " + x));

    var results = scheduler.Start(() => resultObs, 0, 0, 100);

    results.Messages.AssertEqual(
        //10 toggle False
        //10 param a
        //11 toggle True
        OnNext(13, "0 a"),
        OnNext(15, "1 a"),
        OnNext(17, "2 a"),
        //18 toggle False // should not continue after toggle is off
        //29 param b
        //30 toggle True
        OnNext(32, "0 b"),
        OnNext(34, "1 b"),
        OnNext(36, "2 b"),
        OnNext(38, "3 b"),
        //39 param c
        OnNext(40, "4 b"), // fine if on parameter change, the currently running query finishes
        OnNext(42, "5 c"),
        OnNext(44, "6 c")
        //45 toggle False
        //100 toggle False
        );
}


推荐答案

所以我考虑了更多并设法找到了可行的解决方案。

So I thought about it more and managed to find a working solution.

public class Function2
{
    private readonly IObservable<bool> _toggle;
    private readonly IObservable<string> _param;
    private readonly Func<int, string, IObservable<string>> _query;

    private int _index;
    private string _latestParam;

    public Function2(IObservable<bool> toggle, IObservable<string> param, Func<int, string, IObservable<string>> query)
    {
        _toggle = toggle;
        _param = param;
        _query = query;
    }

    public IObservable<string> Execute()
    {
        // TODO : Dispose the subscriptions
        _toggle.Subscribe(OnToggle);
        _param.Subscribe(OnParam);

        return _toggle
            .Select(b => b
                ? Observable
                    .Defer(InnerQuery)
                    .Repeat()
                : Observable
                    .Never<string>())
            .Switch();
    }

    private void OnToggle(bool tgl)
    {
        // if toggle is on, reset the index
        if(tgl)
        {
            _index = 0;
        }
    }

    private void OnParam(string param)
    {
        _latestParam = param;
    }

    private IObservable<string> InnerQuery()
    {
        var ret = _query(_index, _latestParam);

        _index++;

        return ret;
    }
}

但我不喜欢它是因为:

But I don't like it because:


  • 在查询方法中更新索引。这似乎是通话有副作用的坏情况。

  • 切换需要订阅两次

  • 我不确定是否以及如何处理订阅。

  • Updating the index is done in the query method. This seem like bad case of call having side effects.
  • toggle needs to be subscribed twice
  • I'm not sure if and how to handle disposing of the subscriptions. This will run from start to end of the application, so it might not be an issue, but still.

我喜欢它,因为它会从应用程序的开始到结束运行,因此仍然不是问题。可以轻松地根据传入的参数更改和更新索引来微调和修改参数更新的行为。

I like it because it make it easy to fine-tune and modify the behavior of update of parameters based on incoming parameter change and updating index.

这篇关于通过Task函数可实现无限IObservable以及可通过参数进行切换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆