什么是创建一个可观察其内容的流结束的正确方法 [英] What is the proper way to create an Observable which reads a stream to the end

查看:156
本文介绍了什么是创建一个可观察其内容的流结束的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我挣扎在这里。一般情况下我读了一本书,但目前还没有任何呢。我发现使用RX伴读数据流做各种事情的例子不胜枚举,但我发现它很难左右让我的头。



我知道我可以使用。Observable.FromAsyncPattern创建的流的的BeginRead / EndRead或BeginReadLine / EndReadLine方法的包装



但是,这只是一次读取 - 第一观察者订阅的时候。

我想要一个观测这将保持阅读和OnNext泵送直至流错误或结束



在除此之外,我也想知道我怎么就可以共享观察到与多个用户,使他们都得到了项目。


解决方案

的解决方法是使用Observable.Create



下面是可以者进行调整用于读取任何一种流的示例

 公共静态IConnectableObservable<指挥与GT; GetReadObservable(这CommandReader读卡器)
{

返回Observable.Create<指挥与GT;(异步(主题,令牌)=>
{



{

,而(真)
{

如果(token.IsCancellationRequested)
{
受.OnCompleted();
的回报;
}

//这里这部分可以改变这样的事情
// INT收到=等待Task.Factory。 FromAsync< INT>(innerSocket.BeginReceive(数据,偏移,大小,SocketFlags.None,NULL,NULL),innerSocket.EndReceive);

指令CMD =等待reader.ReadCommandAsync();

subject.OnNext(CMD);

}

}

赶上(异常前)
{

{
subject.OnError(除息);
}
赶上(例外)
{
的Debug.WriteLine(一个异常,而试图调用的OnError上观察到的题目抛出 - 意味着你不用到处捕捉异常);
抛出;
}
}

})发布();

}



不要忘了调用Connect()的返回IConnectableObservable


I'm struggling here. Normally I'd read a book but there aren't any yet. I've found countless examples of various things to do with reading streams using RX but I'm finding it very hard to get my head around.

I know I can use Observable.FromAsyncPattern to create a wrapper of the Stream's BeginRead/EndRead or BeginReadLine/EndReadLine methods.

But this only reads once -- when the first observer subscribes.

I want an Observable which will keep reading and pumping OnNext until the stream errors or ends.

In addition to this, I'd also like to know how I can then share that observable with multiple subscribers so they all get the items.

解决方案

The solution is to use Observable.Create

Here is an example which can be adapated for reading any kind of stream

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    //this part here can be changed to something like this
                    //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive);

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

Don't forget to call Connect() on the returned IConnectableObservable

这篇关于什么是创建一个可观察其内容的流结束的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆