System.Reactive 中递归调度的功能优势是什么? [英] What are the functional benefits of recursive scheduling in System.Reactive?

查看:55
本文介绍了System.Reactive 中递归调度的功能优势是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在阅读 http://www.introtorx.com/ 并且我正在阅读非常有兴趣从我的反应式代码中剥离 Subject.我开始了解如何封装序列生成,以便我可以更好地推理给定序列.我阅读了一些 SO 问题并最终阅读了有关调度的内容.特别感兴趣的是递归调度,使用 Schedule(this IScheduler scheduler, Action<TState,Action<TState>>)重载 - 喜欢这个.

I'm current reading http://www.introtorx.com/ and I'm getting really interested in stripping Subject<T> out of my reactive code. I'm starting to understand how to encapsulate sequence generation so that I can reason better about a given sequence. I read a few SO questions and ended up reading about scheduling. Of particular interest is recursive scheduling, using the Schedule(this IScheduler scheduler, Action<TState,Action<TState>>)overloads - like this one.

这本书开始在一些领域显示出它的时代性,我看到最大的是它从未将其技术与使用 Taskasync/可以实现的替代方案进行比较await 语言特性.我总是觉得通过忽略书中的建议和使用异步玩具,我可以编写更少的代码,但我的思想背后唠叨我很懒惰,没有正确学习模式.

The book is starting to show its age in a few areas, and the biggest i see is that it never compares its techniques to alternatives that may be achieved using the Task and async/await language features. I always end up feeling like I could write less code by ignoring the book advice and using the asynchronous toys, but the back of my mind nags me about being lazy and not learning the pattern properly.

有了这个,这是我的问题.如果我想每隔一段时间安排一个序列,支持取消并在后台线程上执行工作,我可能会这样做:

With that, here is my question. If I wanted to schedule a sequence at an interval, support cancellation and perform work on a background thread, I might do this:

    static void Main(string[] args)
    {
        var sequence = Observable.Create<object>(o =>
        {
            CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
            DoWerk(o, cancellationTokenSource);

            return cancellationTokenSource.Cancel;
        });

        sequence.Subscribe(p => Console.Write(p));

        Console.ReadLine();
    }

    private static async void DoWerk(IObserver<object> o, CancellationTokenSource cancellationTokenSource)
    {
        string message =  "hello world!";
        for (int i = 0; i < message.Length; i++)
        {
            await Task.Delay(250, cancellationTokenSource.Token);
            o.OnNext(message[i]);

            if (cancellationTokenSource.IsCancellationRequested)
            {
                break;
            }
        }

        o.OnCompleted();
    }

注意使用 async void 来创建并发,而无需使用 Task.Run() 显式借用线程池线程.await Task.Delay() 然而,它会做到这一点,但不会长期租用线程.

Note the use of async void to create concurrency without explicitly borrowing a thread pool thread with Task.Run(). await Task.Delay() will, however, do just that but it will not lease the thread for long.

这里有什么限制和陷阱?您可能更喜欢使用递归调度的原因是什么?

What are the limitations and pitfalls here? What are the reasons that you might prefer to use recursive scheduling?

推荐答案

我个人不会使用 await Task.Delay(250, cancelTokenSource.Token); 作为减慢循环的方法.它比 Thread.Sleep(250) 好,但对我来说仍然是代码味道.

I personally wouldn't use await Task.Delay(250, cancellationTokenSource.Token); as a way to slow down a loop. It's better than Thread.Sleep(250), but it's still code smell to me.

我认为您应该优先使用内置运算符而不是像这样自己动手的解决方案.

I would look at it that you should use a built-in operator in preference to a roll-your-own solution like this.

您需要的运算符是最强大但经常被忽视的运算符之一.试试 Observable.Generate.他是这样的:

The operator you need is one of the most powerful, but often overlooked. Try Observable.Generate. He's how:

static void Main(string[] args)
{
    IObservable<char> sequence = Observable.Create<char>(o =>
    {
        string message =  "hello world!";
        return
            Observable
                .Generate(
                    0,
                    n => n < message.Length,
                    n => n + 1,
                    n => message[n],
                    n => TimeSpan.FromMilliseconds(250.0))
                .Subscribe(o);
    });

    using (sequence.Subscribe(p => Console.Write(p)))
    {
        Console.ReadLine();
    }
}

这是自取消(当您对订阅调用 .Dispose() 时)并且每 250.0 毫秒产生一次值.

This is self-cancelling (when you call .Dispose() on the subscription) and produces values every 250.0 milliseconds.

我继续使用 Observable.Create 运算符来确保 message 变量被封装在 observable 中 - 否则有人可能会更改该值message 因为 observable 正在使用它并因此破坏它.

I've continued to use the Observable.Create operator to ensure that the message variable is encapsulated within the observable - otherwise it is possible for someone to change the value of message as the observable is working with it and thus break it.

作为替代方案,这可能在内存方面效率不高,但可以自我封装,试试这个:

As an alternative, that might not be as efficient with memory, but is self-encapsulating, try this:

IObservable<char> sequence =
    Observable
        .Generate(
            "hello world!",
            n => !String.IsNullOrEmpty(n),
            n => n.Substring(1),
            n => n[0],
            n => TimeSpan.FromMilliseconds(250.0));

最后,您的问题中的调度没有任何递归".你这是什么意思?

And, finally, there's nothing "recursive" about the scheduling in your question. What did you mean by that?

我终于知道你在看什么了.我在问题中错过了.

I finally figured out what you're looking at. I missed it in the question.

以下是使用递归调度的示例:

Here's an example using the recursive scheduling:

IObservable<char> sequence = Observable.Create<char>(o =>
{
    string message = "hello world!";
    return Scheduler.Default.Schedule<string>(message, TimeSpan.FromMilliseconds(250.0), (state, schedule) =>
    {
        if (!String.IsNullOrEmpty(state))
        {
            o.OnNext(state[0]);
            schedule(state.Substring(1), TimeSpan.FromMilliseconds(250.0));
        }
        else
        {
            o.OnCompleted();
        }
    });
});

这篇关于System.Reactive 中递归调度的功能优势是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆