为什么重复将Enumerable转换为Observable转换块 [英] Why does repeated Enumerable to Observable conversion block
问题描述
出于好奇,这是一个颇有教育意义的问题.请考虑以下代码段:
This is a rather educational, out of curiosity question. Consider the following snippet:
var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
SubscribeDebug
订阅了一个简单的观察者的地方:
Where SubscribeDebug
subscribes a simple observer:
public class DebugObserver<T> : IObserver<T>
{
public void OnCompleted()
{
Debug.WriteLine("Completed");
}
public void OnError(Exception error)
{
Debug.WriteLine("Error");
}
public void OnNext(T value)
{
Debug.WriteLine("Value: {0}", value);
}
}
此输出为:
值:0
值:1
值:2
值:3
值:4
然后阻止.有人可以帮助我了解发生这种情况的根本原因以及为什么该可观察项没有完成的原因吗?我注意到,它无需Concat
调用即可完成,但会阻塞.
And then blocks. Can someone help me understand the underlying reason why it happens and why the observable does not complete? I have noticed that it does complete without the Concat
call, but blocks with it.
推荐答案
I've looked at the source of ToObservable
and distilled a minimal implementation. It does reproduce the behavior we're seeing.
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
{
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
{
if (enumerator.MoveNext())
{
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
}
else
{
observer.OnCompleted();
}
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
}
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
}
);
将其排除在外-问题的症结在于CurrentThreadScheduler
的行为,这是使用的默认调度程序.
With that out of the way - the crux of the problem lies in the behavior of CurrentThreadScheduler
, which is the default scheduler used.
CurrentThreadScheduler
的行为是,如果在调用Schedule
的过程中已经运行了计划,则最终将其排入队列.
The behavior of CurrentThreadScheduler
is that if a schedule is already running while Schedule
is being called - it ends up being queued.
CurrentThreadScheduler.Instance.Schedule(() =>
{
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
});
这将打印2 1
.这种排队行为是我们的撤销.
This prints 2 1
. This queuing behavior is our undoing.
当调用observer.OnCompleted()
时,它将导致Concat
开始下一个枚举-但是,情况与开始时不同-当我们尝试安排下一个枚举时,我们仍然在observer => { }
块内一.因此,不是立即执行,而是将下一个计划排入队列.
When observer.OnCompleted()
is called, it causes Concat
to start the next enumeration - however, things are not the same as when we started out - we are still inside the observer => { }
block when we try to schedule the next one. So instead of executing immediately, the next schedule gets queued.
现在enumerator.MoveNext()
陷入死锁.
它不能移动到下一个项目-MoveNext
处于阻塞状态,直到下一个项目到达-该项目只有在ToObservable
循环安排时才能到达.
Now enumerator.MoveNext()
is caught in a dead-lock.
It can't move to the next item - MoveNext
is blocking until the next item arrives - which can only arrive when scheduled by the ToObservable
loop.
但是,调度程序只能工作,以通知ToEnumerable
并随后通知MoveNext()
被阻止的对象-一旦退出loopRec
,它就不能通知,因为它首先被MoveNext
阻止了.
But the Scheduler can only work to notify ToEnumerable
and subsequently MoveNext()
which is being held up - once it exits loopRec
- which it can't because it's being blocked by MoveNext
in the first place.
附录
This is approximately what ToEnumerable
(from GetEnumerator.cs) does (not a valid implementation):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
{
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => { queue.Enqueue(value); gate.Release(); },
() => gate.Release()))
while (true)
{
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
}
}
在产生下一项之前,Enumerables将被阻塞-这就是为什么要实现门控的原因.不是Enumerable.Range
会阻止,而是ToEnumerable
.
Enumerables are expected to be blocking until the next item is yielded - and that's why there's a gating implementation. It's not Enumerable.Range
which blocks, but ToEnumerable
.
这篇关于为什么重复将Enumerable转换为Observable转换块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!