NewThreadScheduler.Default 将所有工作安排在同一线程上 [英] NewThreadScheduler.Default schedules all work on same thread

查看:33
本文介绍了NewThreadScheduler.Default 将所有工作安排在同一线程上的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正试图用 RX .NET 解决并发问题,但对某些事情感到困惑.我想并行运行四个相对较慢的任务,所以我假设 NewThreadScheduler.Default 将是要走的路,因为它"代表一个对象,它在单独的线程上调度每个工作单元.".

I'm currently trying to wrap my head around concurrency with RX .NET and getting confused by something. I want to run four relatively slow tasks in parallel, so I assumed NewThreadScheduler.Default would be the way to go, as it "Represents an object that schedules each unit of work on a separate thread.".

这是我的设置代码:

    static void Test()
    {
        Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId);

        var query = Enumerable.Range(1, 4);
        var obsQuery = query.ToObservable(NewThreadScheduler.Default);
        obsQuery.Subscribe(DoWork, Done);

        Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

    static void DoWork(int i)
    {
        Thread.Sleep(500);
        Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId);
    }

    static void Done()
    {
        Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId);
    }

我假设X Thread Y"每次都会输出不同的线程ID,但实际输出是:

I assumed the "X Thread Y" would output a different thread id every time, however the actual output is:

Starting. Thread 1
Last line. Thread 1
1 Thread 3
2 Thread 3
3 Thread 3
4 Thread 3
Done. Thread 3

所有工作都按顺序在同一个新线程上进行,这不是我所期望的.

All the work is being one on the same new thread in sequential order, which isn't what I was expecting.

我假设我遗漏了一些东西,但我不知道是什么.

I'm assuming I'm missing something, but I can't figure out what.

推荐答案

一个可观察的查询有两部分,Query 本身和 Subscription.(这也是 ObserveOn 和 SubscribeOn 运算符之间的区别.)

There are two parts to an observable query, the Query itself and the Subscription. (This is also the difference between the ObserveOn and SubscribeOn operators.)

您的查询

Enumerable
    .Range(1, 4)
    .ToObservable(NewThreadScheduler.Default);

这将创建一个 observable,它在该系统的默认 NewThreadScheduler 上生成值.

This creates an observable that produces values on the default NewThreadScheduler for that system.

您的订阅是

obsQuery.Subscribe(DoWork, Done);

QueryQuery 结束时,这会为 QueryDone 产生的每个值运行 DoWorkcode>OnComplete 调用.我认为对于 subscribe 方法中的函数将在哪个线程上调用没有任何保证,实际上,如果查询的所有值都在与运行订阅的线程相同的线程上生成.看来他们也在这样做,因此所有订阅调用都在同一个线程上进行,这很可能是为了消除许多常见的多线程错误.

This runs DoWork for each value produced by the Query and Done when the Query finishes with an OnComplete call. I don't think there are any guarantees about what thread the functions in the subscribe method will be called on, in practice if all values of the query are produced on the same thread that is the thread the subscription will be run on. It appears they are also making it so all of the subscription calls are made on the same thread which is most likely done to get rid of a lot of common multi-threading errors.

所以您有两个问题,一个是您的日志记录,如果您将 Query 更改为

So you have two issues, one is with your logging, if you change your Query to

Enumerable
    .Range(1, 4)
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId);
    .ToObservable(NewThreadScheduler.Default);

您将看到在新线程上生成的每个值.

You'll see each value produced on a new thread.

另一个问题是 Rx 的意图和设计之一.预期 Query 是一个长期运行的过程,而 Subscription 是一个处理结果的简短方法.如果您想将长时间运行的函数作为 Rx Observable 运行,您最好的选择是使用 Observable.ToAsync.

The other issue is one of the intention and design of Rx. It's intended that the Query is the long-running process and the Subscription is a short method that deals with the results. If you want to run a long running function as an Rx Observable your best option is to use Observable.ToAsync.

这篇关于NewThreadScheduler.Default 将所有工作安排在同一线程上的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆