为什么每个观察代表一个新的线程上运行 [英] Why each observation delegate runs on a new thread

查看:103
本文介绍了为什么每个观察代表一个新的线程上运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在接收使用Scheduler.NewThread为ObserveOn方法时,是什么让每个观察委托(OnNext)上一个新的线程,运行时,接收已经保证了OnNexts永远不会重叠的优势。如果每个OnNext将被称为一个又一个,为什么需要为每个新的线程。



我明白为什么有人要在不同的线程运行观察代表除认购和应用程序线程,但是时,他们绝不会并行运行一个新的线程运行的每个观察代表?....没有任何意义,我还是我失去了一些东西?



例如

 使用系统; 
使用System.Linq的;
使用System.Reactive.Concurrency;
使用System.Reactive.Linq;
使用的System.Threading;

命名空间RxTesting
{
类节目
{
静态无效的主要(字串[] args)
{
控制台。的WriteLine(应​​用程序线程:{0},Thread.CurrentThread.ManagedThreadId);

变种人数从数=在Enumerable.Range(1,10)选择Process(号码);

VAR observableNumbers = numbers.ToObservable()
.ObserveOn(Scheduler.NewThread)
.SubscribeOn(Scheduler.NewThread);

observableNumbers.Subscribe(
N => Console.WriteLine(消费:{0}的主题\t:{1},N,Thread.CurrentThread.ManagedThreadId));

Console.ReadKey();
}

私有静态诠释过程(INT数)
{
的Thread.Sleep(500);
Console.WriteLine(生产:{0}的主题\t:{1},编号,
Thread.CurrentThread.ManagedThreadId);

返回数;
}
}
}



以上代码生成以下结果。 ,注意消费是在一个新的线程,每次做

 应用程序线程:8 
生产:1的主题: 9
耗竭与:1的主题:10
生产:2主题:关于主题3:在线程2:9
消费11
生产9
消费:3主题:12
生产:4线程:9
耗竭与:4主题:13
生产:5帖子:9
消费:5主题:14
生产:6帖子:9
耗竭与:6帖子:15
生产:7主题:9
消费:7主题:16
生产: 8月主题:9
耗竭与:8主题:17
生产:9月主题:9
消费:9月主题:18
生产:10主题:9
耗竭与:10主题:19


解决方案

的NewThread调度为长期运行的用户非常有用。如果不指定任何调度,生产商被阻塞等待用户来完成。通常情况下,你可以使用Scheduler.ThreadPool,但如果你希望有很多很多长时间运行的任务,你不会想阻塞的线程池与他们(因为它可能不仅仅是一个单一的可观测的用户使用)。



例如,请考虑您的示例中的以下修改。我搬到延迟到用户,并添加时主线程已经准备好了键盘输入的指示。 。请注意,当您取消注释NewThead线的区别



 使用系统; 
使用System.Linq的;
使用System.Reactive.Concurrency;
使用System.Reactive.Linq;
使用的System.Threading;

命名空间RxTesting
{
类节目
{
静态无效的主要(字串[] args)
{
控制台。的WriteLine(应​​用程序线程:{0},Thread.CurrentThread.ManagedThreadId);

变种号码从数Enumerable.Range(1,10)选择处理(数)=;

VAR observableNumbers = numbers.ToObservable()
// .ObserveOn(Scheduler.NewThread)
// .SubscribeOn(Scheduler.NewThread)
;

observableNumbers.Subscribe($ B $亿= GT; {
的Thread.Sleep(500);
Console.WriteLine(消费:{0}上螺纹\t :{1},正,Thread.CurrentThread.ManagedThreadId);
});

Console.WriteLine(等待键盘);
Console.ReadKey();
}

私有静态诠释过程(INT数)
{
Console.WriteLine(生产:{0}的主题\t:{1} ,号码,
Thread.CurrentThread.ManagedThreadId);

返回数;
}
}
}



那么,为什么不接收优化使用相同线程对每个用户?如果用户是如此长时间运行,你需要一个新的线程,线程创建的开销将是微不足道的反正。唯一的例外是,如果大多数用户是短,但也有少数是长时间运行的,那么优化重复使用相同的线程确实将是有益的。


In Rx when using Scheduler.NewThread for ObserveOn method, what is the advantage of having each Observation delegate (OnNext) running on a new thread, when Rx already guarantees that the OnNexts will never overlap. If every OnNext is going to be called one after another, why need new thread for each of them.

I understand why would one want to run Observation delegates on a thread different than the subscription and application thread but running each observation delegate on a new thread when they will never run in parallel?.... doesn't make sense to me or am I missing something here?

For Example

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1,10) select Process(number);

            var observableNumbers = numbers.ToObservable()
                .ObserveOn(Scheduler.NewThread)
                .SubscribeOn(Scheduler.NewThread);

            observableNumbers.Subscribe(
                n => Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId));

            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Thread.Sleep(500);
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

The above code produces following result. Notice the Consuming is done on a new thread each time.

Application Thread : 8
Producing : 1    on Thread : 9
Consuming : 1    on Thread : 10
Producing : 2    on Thread : 9
Consuming : 2    on Thread : 11
Producing : 3    on Thread : 9
Consuming : 3    on Thread : 12
Producing : 4    on Thread : 9
Consuming : 4    on Thread : 13
Producing : 5    on Thread : 9
Consuming : 5    on Thread : 14
Producing : 6    on Thread : 9
Consuming : 6    on Thread : 15
Producing : 7    on Thread : 9
Consuming : 7    on Thread : 16
Producing : 8    on Thread : 9
Consuming : 8    on Thread : 17
Producing : 9    on Thread : 9
Consuming : 9    on Thread : 18
Producing : 10   on Thread : 9
Consuming : 10   on Thread : 19

解决方案

The NewThread scheduler is useful for long-running subscribers. If you don't specify any scheduler, the producer is blocked waiting the subscribers to complete. Often, you can use Scheduler.ThreadPool, but if you expect to have many many long running tasks, you won't want to clog up your thread pool with them (since it may be used by more than just the subscribers of a single observable).

For example, consider the following modification on your example. I moved the delay to the subscriber and added an indication of when the main thread was ready for keyboard input. Notice the difference when you uncomment NewThead lines.

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

            var numbers = from number in Enumerable.Range(1, 10) select Process(number);

            var observableNumbers = numbers.ToObservable()
//              .ObserveOn(Scheduler.NewThread)
//              .SubscribeOn(Scheduler.NewThread)
            ;

            observableNumbers.Subscribe(
                n => {
                    Thread.Sleep(500);
                    Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
                });

            Console.WriteLine("Waiting for keyboard");
            Console.ReadKey();
        }

        private static int Process(int number)
        {
            Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
                              Thread.CurrentThread.ManagedThreadId);

            return number;
        }
    }
}

So why doesn't Rx optimize to use the same thread for each subscriber? If subscribers are so long running that you need a new thread, the thread creation overhead will be insignificant anyway. The one exception is that if most subscribers are short but a few are long running, then an optimization to reuse the same thread would indeed be useful.

这篇关于为什么每个观察代表一个新的线程上运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆