处置的XmlReader与未决异步读 [英] Disposing of XmlReader with pending async read

查看:195
本文介绍了处置的XmlReader与未决异步读的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在写一个.NET XMPP库的乐趣,并为已经讨论其他地方的XmlReader 在之前的版本.NET 4.5的实现是不适合解析从XML 的NetworkStream ,因为它不会开始解析直到它填补了一个内部​​的4KB缓冲区或达到EOF。

其他库不使用的XmlReader 所有解决此得到。正如在previously链接的问题,叽里咕噜网采用的是Java的XML解析器的一个端口。我发现搜索时,巴贝尔IM的实现,使用它自己的简单的XML解析器。我不知道是什么agsXMPP一样。

然而,随着.NET 4.5的发布和新的异步功能的XmlReader 显然得到了升级,现在可以做到真正的<一个href="http://social.msdn.microsoft.com/Forums/en-US/xmlandnetfx/thread/9e73cdb3-c5d2-4426-a66b-ace363c4a80a/"相对=nofollow>异步解析。我因此用它来一起破解一个相当简单的XMPP客户端,可以连接到服务器并发送和接收消息。

症结然而,实际上似乎从服务器中的断开的。在断开我通常只是想的Dispose()我的的XmlReader 实例和底层流。然而,的Dispose()实际上将抛出一个 InvalidOperationException异常消息异步操作已在进行中。如果你把它当一个异步...好消息所说的话。但是,XMPP的性质,因为,我的的XmlReader 基本上是不断地进行,因为它等待XML节从服务器异步操作下来的管道。

有没有出现任何的 的方法对的XmlReader ,我可以用它来告诉它取消所有待执行异步操作,这样我可以的Dispose() 它干净。 有没有更好的方式来处理这种情况不是根本就没有试图处置的XmlReader 的XMPP的规范指出,服务器应该发送一个关闭&LT; /流:流&gt; 断开连接标签。我的可以的使用这个作为一个信号,不要试图进行另一次异步读作不出意外应该被灌进管道,但不能保证这一点。

下面是一些示例code一起玩。 LongLivedTextStream 基本上模拟在一个开放的的NetworkStream 它从来没有达到EOF并且将阻塞,直到至少有1个字节可以被读取。你可以注入XML文本到它其中的XmlReader 将愉快地分析,但试图出售的读者会引发上述异常。

 使用系统;
使用System.Collections.Generic;
使用System.IO;
使用System.Linq的;
使用System.Text;
使用的System.Threading;
使用System.Threading.Tasks;
使用的System.Xml;

命名空间示例
{
    类LongLivedTextStream:流
    {
        ManualResetEvent的moarDatas =新的ManualResetEvent(假);

        名单&LT;字节&GT;数据=新的名单,其中,字节&GT;();
        INT POS = 0;

        公共无效注入(文本字符串)
        {
            data.AddRange(新UTF8Encoding(假).GetBytes(文本));

            moarDatas.Set();
        }

        公众覆盖INT读(byte []的缓冲区,诠释抵消,诠释计数)
        {
            VAR字节=的GetBytes(计数).ToArray();

            的for(int i = 0;偏移+ I&LT; buffer.Length和放大器;&安培; I&LT; bytes.Length;我++)
            {
                缓冲区[偏移+我=字节[I]
            }

            返回bytes.Length;
        }

        私人的IEnumerable&LT;字节&GT;的GetBytes(诠释计数)
        {
            INT返回= 0;

            而(返回== 0)
            {
                如果(POS&L​​T; data.Count)
                {
                    而(POS&L​​T; data.Count和放大器;&放大器;返回&LT;计数)
                    {
                        产量返回数据[POS]

                        POS + = 1;返回+ = 1;
                    }
                }
                其他
                {
                    moarDatas.Reset();
                    moarDatas.WaitOne();
                }
            }
        }

        #地区其他流成员

        公众覆盖布尔的CanRead
        {
            获得{返回true; }
        }

        公众覆盖布尔CanSeek
        {
            获得{返回false; }
        }

        公众覆盖布尔CanWrite
        {
            获得{返回false; }
        }

        公众覆盖无效的同花顺(){}

        公众覆盖长的长度
        {
            获得{抛出新NotSupportedException异常(); }
        }

        公众覆盖多头头寸
        {
            获得{抛出新NotSupportedException异常(); }
            集合{抛出新NotSupportedException异常(); }
        }

        公众覆盖寻求长(长偏移,SeekOrigin原点)
        {
            抛出新NotSupportedException异常();
        }

        公众覆盖无效SetLength(long值)
        {
            抛出新NotSupportedException异常();
        }

        公共覆盖无效写入(byte []的缓冲区,诠释抵消,诠释计数)
        {
            抛出新NotSupportedException异常();
        }

        #endregion
    }

    公共类节目
    {
        公共静态无效的主要(字串[] args)
        {
            测试();
            到Console.ReadLine();
        }

        公共静态异步无效测试()
        {
            VAR流=新LongLivedTextStream();
            VAR读卡器= XmlReader.Create(流,新XmlReaderSettings(){异步=真});

            VAR T = Task.Run(()=&GT;
                {
                    stream.Inject(&LT;根&GT;);
                    Thread.sleep代码(2000);
                    stream.Inject(价值);
                    Thread.sleep代码(2000);
                    stream.Inject(&LT; /根&GT;);
                    Thread.sleep代码(2000);

                    reader.Dispose(); // InvalidOperationException异常:异步操作已在进行中。

                    Console.WriteLine(「出售」);
                });

            同时,(等待reader.ReadAsync())
            {
                布尔杀= FALSE;

                开关(reader.NodeType)
                {
                    案例XmlNodeType.Element:
                        Console.WriteLine(开始:+ reader.LocalName);
                        打破;

                    案例XmlNodeType.EndElement:
                        Console.WriteLine(结束:+ reader.LocalName);
                        //杀= TRUE; //我可以用一个特殊的EndElement作为一个信号,不尝试另一种读
                        打破;

                    案例XmlNodeType.Text:
                        Console.WriteLine(文本:+等待reader.GetValueAsync());
                        打破;
                }

                如果(杀){打破; }
            }
        }
    }
}
 

修改

本例使用实际的NetworkStream ,并表明如果我关闭()的Dispose()底层流对 ReadAsync()呼叫的XmlReader 不回假的希望,而是继续阻止。

 使用系统;
使用System.Collections.Generic;
使用System.IO;
使用System.Linq的;
使用System.Net;
使用的System.Net.Sockets;
使用System.Text;
使用的System.Threading;
使用System.Threading.Tasks;
使用的System.Xml;

命名空间示例
{
    公共类节目
    {
        公共静态无效的主要(字串[] args)
        {
            的NetworkStream流= NULL;

            VAR终点=新IPEndPoint(IPAddress.Parse(127.0.0.1),50000);

            VAR serverIsUp =新的ManualResetEvent(假);
            VAR doneWriting =新的ManualResetEvent(假);

            变种T1 = Task.Run(()=&GT;
            {
                VAR服务器=新的TcpListener(终点);
                server.Start();

                serverIsUp.Set();

                变种客户= server.AcceptTcpClient();

                VAR作家=新的StreamWriter(client.GetStream());

                writer.Write(&LT;根&GT;); writer.Flush();
                Thread.sleep代码(2000);
                writer.Write(价值); writer.Flush();
                Thread.sleep代码(2000);
                writer.Write(&LT; /根&GT;); writer.Flush();
                Thread.sleep代码(2000);

                doneWriting.Set();
            });

            变种T2 = Task.Run(()=&GT;
                {
                    doneWriting.WaitOne();

                    stream.Dispose();

                    Console.WriteLine(处置溪);
                });

            VAR T3 = Task.Run(异步()=&GT;
            {
                serverIsUp.WaitOne();

                VAR插座=新的Socket(SocketType.Stream,ProtocolType.Tcp);
                socket.Connect(终点);

                流=新的NetworkStream(插座,真正的);

                VAR读卡器= XmlReader.Create(流,新XmlReaderSettings(){异步=真});

                布尔VAL;
                而(VAL =等待reader.ReadAsync())
                {
                    布尔杀= FALSE;

                    开关(reader.NodeType)
                    {
                        案例XmlNodeType.Element:
                            Console.WriteLine(开始:+ reader.LocalName);
                            打破;

                        案例XmlNodeType.EndElement:
                            Console.WriteLine(结束:+ reader.LocalName);
                            //杀= TRUE; //我可以用一个特殊的EndElement作为一个信号,不尝试另一种读
                            打破;

                        案例XmlNodeType.Text:
                            Console.WriteLine(文本:+等待reader.GetValueAsync());
                            打破;
                    }

                    如果(杀){打破; }
                }

                //理想的情况下,一旦底层的流关闭,ReadAsync()将返回false
                //我们会来到这里,并能安全地处理读者,但事实并非如此
                // ReadAsync()继续至框
                reader.Dispose();
                Console.WriteLine(处置阅读器);
            });

            到Console.ReadLine();
        }
    }
}
 

解决方案

尝试注入手册&LT; /流:流&gt; 进入分析器。要做到这一点,你可能需要的NetworkStream和分析器,它通过了所有输入的数据分析器,但增加了另一种方法来注入℃之间适配器类; /流:流&gt; 。你要小心,你是不是在另一节中段,当你调用该方法,也许对保持分析器的输出端的状态。

I'm writing a .NET XMPP library for fun and as has been discussed elsewhere the XmlReader implementation in versions prior to .NET 4.5 was not suitable for parsing XML from a NetworkStream as it would not begin parsing until it filled an internal 4KB buffer or reached EOF.

Other libraries got around this by not using XmlReader at all. As mentioned in the previously linked question, jabber-net uses a port of a Java XML parser. An implementation I found while searching, Babel IM, uses its own simple XML parser. I'm not sure what agsXMPP does.

However, with the release of .NET 4.5 and the new async features XmlReader apparently got an upgrade and can now do true async parsing. I've thus used it to hack together a fairly simple XMPP client that can connect to a server and send and receive messages.

The sticking point however, actually seems to be in disconnecting from the server. On disconnect I would normally just want to Dispose() of my XmlReader instance and the underlying streams. However, Dispose() will actually throw an InvalidOperationException with the message "An asynchronous operation is already in progress." if you call it when an async... well what the message says. However, because of the nature of XMPP, my XmlReader is basically constantly performing an async operation as it waits for XML stanzas from the server to come down the pipe.

There do not appear to be any methods on the XmlReader that I could use to tell it to cancel any pending async operations so that I can Dispose() of it cleanly. Is there a better way to deal with this situation than simply not attempting to dispose of the XmlReader? The XMPP spec states that the server is supposed to send a closing </stream:stream> tag on disconnect. I could use this as a signal to not attempt to perform another async read as nothing else should be coming down the pipe, but there's no guarantee of this.

Here is some sample code to play with. LongLivedTextStream basically emulates an open NetworkStream in that it never reaches EOF and will block until at least 1 byte can be read. You can "inject" XML text into it which the XmlReader will happily parse, but trying to dispose of the reader will trigger the aforementioned exception.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    class LongLivedTextStream : Stream
    {
        ManualResetEvent moarDatas = new ManualResetEvent(false);

        List<byte> data = new List<byte>();
        int pos = 0;

        public void Inject(string text)
        {
            data.AddRange(new UTF8Encoding(false).GetBytes(text));

            moarDatas.Set();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            var bytes = GetBytes(count).ToArray();

            for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
            {
                buffer[offset + i] = bytes[i];
            }

            return bytes.Length;
        }

        private IEnumerable<byte> GetBytes(int count)
        {
            int returned = 0;

            while (returned == 0)
            {
                if (pos < data.Count)
                {
                    while (pos < data.Count && returned < count)
                    {
                        yield return data[pos];

                        pos += 1; returned += 1;
                    }
                }
                else
                {
                    moarDatas.Reset();
                    moarDatas.WaitOne();
                }
            }
        }

        #region Other Stream Members

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush() { }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        #endregion
    }

    public class Program
    {
        public static void Main(string[] args)
        {
            Test();
            Console.ReadLine();
        }

        public static async void Test()
        {
            var stream = new LongLivedTextStream();
            var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

            var t = Task.Run(() =>
                {
                    stream.Inject("<root>");
                    Thread.Sleep(2000);
                    stream.Inject("value");
                    Thread.Sleep(2000);
                    stream.Inject("</root>");
                    Thread.Sleep(2000);

                    reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."

                    Console.WriteLine("Disposed");
                });

            while (await reader.ReadAsync())
            {
                bool kill = false;

                switch (reader.NodeType)
                {
                    case XmlNodeType.Element:
                        Console.WriteLine("Start: " + reader.LocalName);
                        break;

                    case XmlNodeType.EndElement:
                        Console.WriteLine("End:   " + reader.LocalName);
                        //kill = true; // I could use a particular EndElement as a signal to not try another read
                        break;

                    case XmlNodeType.Text:
                        Console.WriteLine("Text:  " + await reader.GetValueAsync());
                        break;
                }

                if (kill) { break; }
            }
        }
    }
}

EDIT

This example uses an actual NetworkStream and shows that if I Close() or Dispose() of the underlying stream the ReadAsync() call on XmlReader does not return false as hoped, instead it continues to block.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    public class Program
    {
        public static void Main(string[] args)
        {
            NetworkStream stream = null;

            var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);                                   

            var serverIsUp = new ManualResetEvent(false);
            var doneWriting = new ManualResetEvent(false);

            var t1 = Task.Run(() =>
            {
                var server = new TcpListener(endpoint);
                server.Start();

                serverIsUp.Set();

                var client = server.AcceptTcpClient();

                var writer = new StreamWriter(client.GetStream());

                writer.Write("<root>"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("value"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("</root>"); writer.Flush();
                Thread.Sleep(2000);

                doneWriting.Set();
            });

            var t2 = Task.Run(() =>
                {
                    doneWriting.WaitOne();

                    stream.Dispose();

                    Console.WriteLine("Disposed of Stream");
                });

            var t3 = Task.Run(async () =>
            {
                serverIsUp.WaitOne();                

                var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
                socket.Connect(endpoint);

                stream = new NetworkStream(socket, true);

                var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

                bool val;
                while (val = await reader.ReadAsync())
                {
                    bool kill = false;

                    switch (reader.NodeType)
                    {
                        case XmlNodeType.Element:
                            Console.WriteLine("Start: " + reader.LocalName);
                            break;

                        case XmlNodeType.EndElement:
                            Console.WriteLine("End:   " + reader.LocalName);
                            //kill = true; // I could use a particular EndElement as a signal to not try another read
                            break;

                        case XmlNodeType.Text:
                            Console.WriteLine("Text:  " + await reader.GetValueAsync());
                            break;
                    }

                    if (kill) { break; }
                }

                // Ideally once the underlying stream is closed, ReadAsync() would return false
                // we would get here and could safely dispose the reader, but that's not the case
                // ReadAsync() continues to block
                reader.Dispose();
                Console.WriteLine("Disposed of Reader");
            });

            Console.ReadLine();
        }
    }
}

解决方案

Try injecting a manual </stream:stream> into the parser. To do this, you may need an adapter class between the NetworkStream and the parser, which passes all of the incoming data to the parser but adds another method to inject the </stream:stream>. You'll need to be careful that you're not in the middle of another stanza when you call that method, perhaps by keeping state on the output side of the parser.

这篇关于处置的XmlReader与未决异步读的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆