为什么重复将Enumerable转换为Observable转换块 [英] Why does repeated Enumerable to Observable conversion block

查看:82
本文介绍了为什么重复将Enumerable转换为Observable转换块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于好奇,这是一个颇有教育意义的问题.请考虑以下代码段:

This is a rather educational, out of curiosity question. Consider the following snippet:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

SubscribeDebug订阅了一个简单的观察者的地方:

Where SubscribeDebug subscribes a simple observer:

public class DebugObserver<T> : IObserver<T>
{
    public void OnCompleted()
    {
        Debug.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Debug.WriteLine("Error");
    }

    public void OnNext(T value)
    {
        Debug.WriteLine("Value: {0}", value);
    }
}

此输出为:

值:0

值:1

值:2

值:3

值:4

然后阻止.有人可以帮助我了解发生这种情况的根本原因以及为什么该可观察项没有完成的原因吗?我注意到,它无需Concat调用即可完成,但会阻塞.

And then blocks. Can someone help me understand the underlying reason why it happens and why the observable does not complete? I have noticed that it does complete without the Concat call, but blocks with it.

推荐答案

我看过

I've looked at the source of ToObservable and distilled a minimal implementation. It does reproduce the behavior we're seeing.

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            {
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                {
                    if (enumerator.MoveNext()) 
                    {
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    }
                    else
                    {
                        observer.OnCompleted();
                    }

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                }

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            }
        );

将其排除在外-问题的症结在于CurrentThreadScheduler的行为,这是使用的默认调度程序.

With that out of the way - the crux of the problem lies in the behavior of CurrentThreadScheduler, which is the default scheduler used.

CurrentThreadScheduler的行为是,如果在调用Schedule的过程中已经运行了计划,则最终将其排入队列.

The behavior of CurrentThreadScheduler is that if a schedule is already running while Schedule is being called - it ends up being queued.

        CurrentThreadScheduler.Instance.Schedule(() =>
        {
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        });

这将打印2 1.这种排队行为是我们的撤销.

This prints 2 1. This queuing behavior is our undoing.

当调用observer.OnCompleted()时,它将导致Concat开始下一个枚举-但是,情况与开始时不同-当我们尝试安排下一个枚举时,我们仍然在observer => { }块内一.因此,不是立即执行,而是将下一个计划排入队列.

When observer.OnCompleted() is called, it causes Concat to start the next enumeration - however, things are not the same as when we started out - we are still inside the observer => { } block when we try to schedule the next one. So instead of executing immediately, the next schedule gets queued.

现在enumerator.MoveNext()陷入死锁. 它不能移动到下一个项目-MoveNext处于阻塞状态,直到下一个项目到达-该项目只有在ToObservable循环安排时才能到达.

Now enumerator.MoveNext() is caught in a dead-lock. It can't move to the next item - MoveNext is blocking until the next item arrives - which can only arrive when scheduled by the ToObservable loop.

但是,调度程序只能工作,以通知ToEnumerable并随后通知MoveNext()被阻止的对象-一旦退出loopRec,它就不能通知,因为它首先被MoveNext阻止了.

But the Scheduler can only work to notify ToEnumerable and subsequently MoveNext() which is being held up - once it exits loopRec - which it can't because it's being blocked by MoveNext in the first place.

附录

这大约是ToEnumerable(来自

This is approximately what ToEnumerable (from GetEnumerator.cs) does (not a valid implementation):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    {
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value => { queue.Enqueue(value); gate.Release(); }, 
            () => gate.Release()))
        while (true)
        {
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        }
    }

在产生下一项之前,Enumerables将被阻塞-这就是为什么要实现门控的原因.不是Enumerable.Range会阻止,而是ToEnumerable.

Enumerables are expected to be blocking until the next item is yielded - and that's why there's a gating implementation. It's not Enumerable.Range which blocks, but ToEnumerable.

这篇关于为什么重复将Enumerable转换为Observable转换块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆