快速重复TakeWhile导致无限循环 [英] Fast Repeat TakeWhile causes infinite loop

查看:154
本文介绍了快速重复TakeWhile导致无限循环的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使以下可观察的重复发生,直到stream.DataAvailable为false? 目前看来它永远不会停止.

How can I make the following observable repeat until stream.DataAvailable is false? Currently it looks like it never stops.

AsyncReadChunk和Observable.在Defer部分内部返回,先进行OnNext调用,再进行OnCompleted调用. 当Repeat收到OnNext调用时,会将其传递给TakeWhile.当TakeWhile's不满意时,它会完成可观察到的操作,但是我认为OnNext之后出现的OnCompleted是如此之快,以至于Repeat重新订阅了可观察到的操作,并导致了无限循环.

AsyncReadChunk and Observable.Return inside the Defer section make OnNext call then OnCompleted call. When Repeat receives the OnNext call it passes it to TakeWhile. When TakeWhile's is not satisfied it completes the observable but I think the OnCompleted that comes right after the OnNext is so fast that it makes Repeat to re-subscribes to the observable and causes the infinite loop.

我该如何纠正这种行为?

How can I correct this behaviour?

public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
{
    return Observable.Defer(() =>
        {
            try
            {
                return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0]);
            }
            catch (Exception)
            {
                return Observable.Return(new byte[0]);
            }
        })
        .Repeat()
        .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
}

推荐答案


SELF ANSWER:(下面是问题作者Samet发布的答案.但是,他将答案发布为问题的一部分.我将其移至单独的答案中,标记为社区Wiki,因为作者自己尚未移动它.)


SELF ANSWER: (Below is an answer posted by Samet, the author of the question. However, he posted the answer as part of the question. I'm moving it into a separate answer, marking as community wiki, since the author hasn't moved it himself.)

我通过重构发现调度程序存在问题. Return使用立即调度程序,而Repeat使用CurrentThread.固定代码如下.

I discovered by refactoring that it is a problem with schedulers. The Return uses Immediate scheduler while Repeat uses CurrentThread. The fixed code is below.

    public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
    {
        return Observable.Defer(() =>
                                    {
                                        try
                                        {
                                            return stream.DataAvailable ? AsyncReadChunk(stream, bufferSize) : Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                        catch (Exception)
                                        {
                                            return Observable.Return(new byte[0], Scheduler.CurrentThread);
                                        }
                                    })
            .Repeat()
            .TakeWhile((dataChunk, index) => dataChunk.Length > 0);
    }

这篇关于快速重复TakeWhile导致无限循环的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆