为什么 IEnumerable.ToObservable 这么慢? [英] Why is IEnumerable.ToObservable so slow?

查看:27
本文介绍了为什么 IEnumerable.ToObservable 这么慢?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试枚举一个大的IEnumerable 一次,观察附有各种运算符的枚举(CountSumAverage 等).显而易见的方法是使用 ToObservable,然后订阅一个观察者.我注意到这比其他方法慢得多,比如做一个简单的循环并在每次迭代时通知观察者,或者使用 Observable.Create 方法而不是 ToObservable.差异很大:它慢了 20-30 倍.就是这样,还是我做错了什么?

I am trying to enumerate a large IEnumerable once, and observe the enumeration with various operators attached (Count, Sum, Average etc). The obvious way is to transform it to an IObservable with the method ToObservable, and then subscribe an observer to it. I noticed that this is much slower than other methods, like doing a simple loop and notifying the observer on each iteration, or using the Observable.Create method instead of ToObservable. The difference is substantial: it's 20-30 times slower. It is what it is, or am I doing something wrong?

using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;

public static class Program
{
    static void Main(string[] args)
    {
        const int COUNT = 10_000_000;
        Method1(COUNT);
        Method2(COUNT);
        Method3(COUNT);
    }

    static void Method1(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        source.ToObservable().Subscribe(subject);
        Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method2(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        foreach (var item in source) subject.OnNext(item);
        subject.OnCompleted();
        Console.WriteLine($"Loop & Notify: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }

    static void Method3(int count)
    {
        var source = Enumerable.Range(0, count);
        var subject = new Subject<int>();
        var stopwatch = Stopwatch.StartNew();
        Observable.Create<int>(o =>
        {
            foreach (var item in source) o.OnNext(item);
            o.OnCompleted();
            return Disposable.Empty;
        }).Subscribe(subject);
        Console.WriteLine($"Observable.Create: {stopwatch.ElapsedMilliseconds:#,0} msec");
    }
}

输出:

ToObservable: 7,576 msec
Loop & Notify: 273 msec
Observable.Create: 511 msec

.NET Core 3.0、C# 8、System.Reactive 4.3.2、Windows 10、控制台应用、已发布

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Windows 10, Console App, Release built

更新:以下是我想要实现的实际功能的示例:

Update: Here is an example of the actual functionality I want to achieve:

var source = Enumerable.Range(0, 10_000_000).Select(i => (long)i);
var subject = new Subject<long>();
var cntTask = subject.Count().ToTask();
var sumTask = subject.Sum().ToTask();
var avgTask = subject.Average().ToTask();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"Count: {cntTask.Result:#,0}, Sum: {sumTask.Result:#,0}, Average: {avgTask.Result:#,0.0}");

输出:

计数:10,000,000,总和:49,999,995,000,000,平均:4,999,999.5

Count: 10,000,000, Sum: 49,999,995,000,000, Average: 4,999,999.5

与使用标准LINQ 运算符,是源可枚举只被枚举一次.

The important difference of this approach compared to using standard LINQ operators, is that the source enumerable is enumerated only once.

另外一个观察:使用 ToObservable(Scheduler.Immediate)ToObservable() 稍微快一些(大约 20%).>

One more observation: using ToObservable(Scheduler.Immediate) is slightly faster (about 20%) than ToObservable().

推荐答案

这是行为良好的 observable 和滚动你自己的因为你认为更快是更好但它"之间的区别-is-not"可观察.

This is the difference between a well behaved observable and a "roll-your-own-because-you-think-faster-is-better-but-it-is-not" observable.

当你深入到源头足够深的时候,你会发现这条可爱的小线:

When you dive down far enough in the source you discover this lovely little line:

scheduler.Schedule(this, (IScheduler innerScheduler, _ @this) => @this.LoopRec(innerScheduler));

每个预定的递归迭代有效地调用 hasNext = enumerator.MoveNext(); 一次.

The is effectively calling hasNext = enumerator.MoveNext(); once per scheduled recursive iteration.

这允许您为 .ToObservable(schedulerOfYourChoice) 调用选择调度程序.

This allows you to choose the scheduler for your .ToObservable(schedulerOfYourChoice) call.

使用您选择的其他选项,您创建了一系列对 .OnNext 的直接调用,实际上什么都不做.Method2 甚至没有 .Subscribe 调用.

With the other options you've chosen you've created a bare-to-the-bone series of calls to .OnNext that virtually do nothing. Method2 doesn't even has a .Subscribe call.

Method2Method1 都使用当前线程运行,并且都在订阅完成之前运行到完成.他们正在阻止呼叫.它们会导致竞争条件.

Both of Method2 and Method1 run using the current thread and both run to completion before the subscription is finished. They are blocking calls. They can cause race conditions.

Method1 是唯一一个表现良好的 observable.它是异步的,可以独立于订阅者运行.

Method1 is the only one that behaves nicely as an observable. It is asynchronous and it can run independently of the subscriber.

请记住,可观察对象是随时间运行的集合.它们通常具有异步源或计时器或对外部刺激的响应.他们不会经常跑掉一个简单的可枚举.如果您正在使用可枚举,那么同步工作应该会运行得更快.

Do keep in mind that observables are collections that run over time. They typically have an async source or a timer or the respond to external stimulus. They don't often run off of a plain enumerable. If you're working with an enumerable then working synchronously should be expected to run faster.

速度不是 Rx 的目标.目标是对基于时间的推送值执行复杂查询.

Speed is not the goal of Rx. Performing complex queries on time-based, pushed values is the goal.

这篇关于为什么 IEnumerable.ToObservable 这么慢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆