Observable.Repeat是不可阻挡的,它是错误还是功能? [英] The Observable.Repeat is unstoppable, is it a bug or a feature?

查看:109
本文介绍了Observable.Repeat是不可阻挡的,它是错误还是功能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到 Repeat 运算符的行为有些奇怪源可观察者的通知是同步的.后续的 TakeWhile 运算符无法停止产生的可观测值,并且显然继续运行永远.为了演示,我创建了一个可观察的源,该源产生一个值,该值在每次订阅时都会递增.第一个订户获得值1,第二个订户获得值2等等:

I noticed something strange with the behavior of the Repeat operator, when the source observable's notifications are synchronous. The resulting observable cannot be stopped with a subsequent TakeWhile operator, and apparently continues running forever. For demonstration I created a source observable that produces a single value, which it is incremented on every subscription. The first subscriber gets the value 1, the second gets the value 2 etc:

int incrementalValue = 0;
var incremental = Observable.Create<int>(async o =>
{
    await Task.CompletedTask;
    //await Task.Yield();

    Thread.Sleep(100);
    var value = Interlocked.Increment(ref incrementalValue);
    o.OnNext(value);
    o.OnCompleted();
});

然后我将运算符RepeatTakeWhileLastAsync附加到此可观察对象,以便程序将等待直到所组合的可观察对象产生其最后一个值:

Then I attached the operators Repeat, TakeWhile and LastAsync to this observable, so that the program will wait until the composed observable produces its last value:

incremental.Repeat()
    .Do(new CustomObserver("Checkpoint A"))
    .TakeWhile(item => item <= 5)
    .Do(new CustomObserver("Checkpoint B"))
    .LastAsync()
    .Do(new CustomObserver("Checkpoint C"))
    .Wait();
Console.WriteLine($"Done");

class CustomObserver : IObserver<int>
{
    private readonly string _name;
    public CustomObserver(string name) => _name = name;
    public void OnNext(int value) => Console.WriteLine($"{_name}: {value}");
    public void OnError(Exception ex) => Console.WriteLine($"{_name}: {ex.Message}");
    public void OnCompleted() => Console.WriteLine($"{_name}: Completed");
}

以下是该程序的输出:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Checkpoint A: 7
Checkpoint A: 8
Checkpoint A: 9
Checkpoint A: 10
Checkpoint A: 11
Checkpoint A: 12
Checkpoint A: 13
Checkpoint A: 14
Checkpoint A: 15
Checkpoint A: 16
Checkpoint A: 17
...

它永远不会结束!尽管LastAsync已产生其值并完成了操作,但Repeat运算符仍在旋转!

It never ends! Although the LastAsync has produced its value and has completed, the Repeat operator keeps spinning!

仅当可观察的源同步通知其订阅者时,才会发生这种情况.例如,取消注释//await Task.Yield();行后,程序将按预期方式运行:

This happens only if the source observable notifies its subscribers synchronously. For example after uncommenting the line //await Task.Yield();, the program behaves as expected:

Checkpoint A: 1
Checkpoint B: 1
Checkpoint A: 2
Checkpoint B: 2
Checkpoint A: 3
Checkpoint B: 3
Checkpoint A: 4
Checkpoint B: 4
Checkpoint A: 5
Checkpoint B: 5
Checkpoint A: 6
Checkpoint B: Completed
Checkpoint C: 5
Checkpoint C: Completed
Done

Repeat运算符停止旋转,尽管它没有报告完成(我猜是已经取消订阅).

The Repeat operator stops spinning, although it does not report completion (my guess is that it has been unsubscribed).

是否有任何方法可以通过Repeat运算符实现一致的行为,而不管其接收到的通知的类型(同步还是异步)?

Is there any way to achieve consistent behavior from the Repeat operator, irrespective of the type of notifications it receives (sync or async)?

.NET Core 3.0,C#8,System.Reactive 4.3.2,控制台应用程序

.NET Core 3.0, C# 8, System.Reactive 4.3.2, Console Application

推荐答案

您可能希望Repeat的实现具有OnCompleted通知功能,但是它会变成

You might expect an implementation of Repeat to feature the OnCompleted notification, but it turns it's implemented in terms of Concat-ing an infinite stream.

    public static IObservable<TSource> Repeat<TSource>(this IObservable<TSource> source) =>
        RepeatInfinite(source).Concat();

    private static IEnumerable<T> RepeatInfinite<T>(T value)
    {
        while (true)
        {
            yield return value;
        }
    }

将职责转移到Concat-我们可以创建一个简化的版本(详细的实现细节在TailRecursiveSink.cs中).除非await Task.Yield()提供了不同的执行上下文,否则这仍然会继续旋转.

With that responsibility shifted to Concat - we can create a simplified version (the gory implementation details are in TailRecursiveSink.cs). This still keeps on spinning unless there's a different execution context provided by await Task.Yield().

public static IObservable<T> ConcatEx<T>(this IEnumerable<IObservable<T>> enumerable) =>
    Observable.Create<T>(observer =>
    {
        var check = new BooleanDisposable();

        IDisposable loopRec(IScheduler inner, IEnumerator<IObservable<T>> enumerator)
        {
            if (check.IsDisposed)
                return Disposable.Empty;

            if (enumerator.MoveNext()) //this never returns false
                return enumerator.Current.Subscribe(
                    observer.OnNext,
                    () => inner.Schedule(enumerator, loopRec) //<-- starts next immediately
                );
            else
                return inner.Schedule(observer.OnCompleted); //this never runs
        }

        Scheduler.Immediate.Schedule(enumerable.GetEnumerator(), loopRec); //this runs forever
        return check;
    });

作为无限流,enumerator.MoveNext()始终返回true,因此另一个分支永远不会运行-这是预料之中的;这不是我们的问题.

Being an infinite stream, enumerator.MoveNext() always returns true, so the other branch never runs - that's expected; it's not our problem.

当调用o.OnCompleted()时,它将立即安排下一个迭代循环 Schedule(enumerator, loopRec)同步调用下一个o.OnCompleted(),并且继续无限进行-没有意义可以逃脱此递归.

When the o.OnCompleted() is called, it immediately schedules the next iterative loop in Schedule(enumerator, loopRec) which synchronously calls the next o.OnCompleted(), and it continues ad infinitum - there's no point where it can escape this recursion.

如果使用await Task.Yield()进行上下文切换,则Schedule(enumerator, loopRec)立即退出,并且o.OnCompleted()被非同步调用.

If you have a context switch with await Task.Yield(), then Schedule(enumerator, loopRec) exits immediately, and o.OnCompleted() is called non-synchronously.

RepeatConcat使用当前线程在不更改上下文的情况下完成工作-这不是不正确的行为,但是当使用相同的上下文推送通知时,它也可能导致死锁或被捕获 永久蹦床.

Repeat and Concat use the current thread to do work without changing the context - that's not incorrect behavior, but when the same context is used to push notifications as well, it can lead to deadlocks or being caught in a perpetual trampoline.

带注释的调用堆栈

[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__2() //inner.Schedule(enumerator, loopRec)
[External Code] 
Main.AnonymousMethod__0(o) //o.OnCompleted();
[External Code] 
ConcatEx.__loopRec|1(inner, enumerator) //return enumerator.Current.Subscribe(...)
[External Code] 
ConcatEx.AnonymousMethod__0(observer) //Scheduler.Immediate.Schedule(...)
[External Code] 
Main(args) //incremental.RepeatEx()...

这篇关于Observable.Repeat是不可阻挡的,它是错误还是功能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆