使用挂起的异步读取处理 XmlReader [英] Disposing of XmlReader with pending async read

查看:21
本文介绍了使用挂起的异步读取处理 XmlReader的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个 .NET XMPP 库是为了好玩,正如在其他地方.NET 4.5 之前版本中的 XmlReader 实现不适合从 NetworkStream 解析 XML,因为它在填充内部 4KB 缓冲区或达到 EOF 之前不会开始解析.

I'm writing a .NET XMPP library for fun and as has been discussed elsewhere the XmlReader implementation in versions prior to .NET 4.5 was not suitable for parsing XML from a NetworkStream as it would not begin parsing until it filled an internal 4KB buffer or reached EOF.

其他库完全不使用 XmlReader 解决了这个问题.正如前面链接的问题中提到的,jabber-net 使用 Java XML 解析器的一个端口.我在搜索时发现的一个实现,Babel IM,使用它自己的简单的 XML 解析器.我不确定 agsXMPP 的作用.

Other libraries got around this by not using XmlReader at all. As mentioned in the previously linked question, jabber-net uses a port of a Java XML parser. An implementation I found while searching, Babel IM, uses its own simple XML parser. I'm not sure what agsXMPP does.

然而,随着 .NET 4.5 的发布和新的异步功能 XmlReader 显然得到了升级,现在可以做到真正的 异步解析.因此,我使用它来编写一个相当简单的 XMPP 客户端,该客户端可以连接到服务器并发送和接收消息.

However, with the release of .NET 4.5 and the new async features XmlReader apparently got an upgrade and can now do true async parsing. I've thus used it to hack together a fairly simple XMPP client that can connect to a server and send and receive messages.

然而,症结实际上似乎是断开与服务器的连接.在断开连接时,我通常只想 Dispose() 我的 XmlReader 实例和底层流.但是,Dispose() 实际上会抛出一个 InvalidOperationException 消息异步操作已经在进行中".如果你在异步时调用它......那么消息所说的.但是,由于 XMPP 的性质,我的 XmlReader 基本上一直在执行异步操作,因为它等待来自服务器的 XML 节进入管道.

The sticking point however, actually seems to be in disconnecting from the server. On disconnect I would normally just want to Dispose() of my XmlReader instance and the underlying streams. However, Dispose() will actually throw an InvalidOperationException with the message "An asynchronous operation is already in progress." if you call it when an async... well what the message says. However, because of the nature of XMPP, my XmlReader is basically constantly performing an async operation as it waits for XML stanzas from the server to come down the pipe.

似乎没有任何 方法 告诉它取消任何挂起的异步操作,以便我可以干净地Dispose().是否有比不尝试处理 XmlReader 更好的方法来处理这种情况? XMPP spec 指出服务器应该在断开连接时发送关闭 </stream:stream> 标记.我可以将此用作信号,以免尝试执行另一个异步读取,因为管道中不应该有任何其他内容,但无法保证这一点.

There do not appear to be any methods on the XmlReader that I could use to tell it to cancel any pending async operations so that I can Dispose() of it cleanly. Is there a better way to deal with this situation than simply not attempting to dispose of the XmlReader? The XMPP spec states that the server is supposed to send a closing </stream:stream> tag on disconnect. I could use this as a signal to not attempt to perform another async read as nothing else should be coming down the pipe, but there's no guarantee of this.

这里是一些示例代码.LongLivedTextStream 基本上模拟了一个开放的 NetworkStream,因为它永远不会到达 EOF 并且会阻塞直到至少可以读取 1 个字节.您可以将 XML 文本注入"其中,XmlReader 会很乐意解析这些文本,但尝试处理读取器会触发上述异常.

Here is some sample code to play with. LongLivedTextStream basically emulates an open NetworkStream in that it never reaches EOF and will block until at least 1 byte can be read. You can "inject" XML text into it which the XmlReader will happily parse, but trying to dispose of the reader will trigger the aforementioned exception.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    class LongLivedTextStream : Stream
    {
        ManualResetEvent moarDatas = new ManualResetEvent(false);

        List<byte> data = new List<byte>();
        int pos = 0;

        public void Inject(string text)
        {
            data.AddRange(new UTF8Encoding(false).GetBytes(text));

            moarDatas.Set();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            var bytes = GetBytes(count).ToArray();

            for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
            {
                buffer[offset + i] = bytes[i];
            }

            return bytes.Length;
        }

        private IEnumerable<byte> GetBytes(int count)
        {
            int returned = 0;

            while (returned == 0)
            {
                if (pos < data.Count)
                {
                    while (pos < data.Count && returned < count)
                    {
                        yield return data[pos];

                        pos += 1; returned += 1;
                    }
                }
                else
                {
                    moarDatas.Reset();
                    moarDatas.WaitOne();
                }
            }
        }

        #region Other Stream Members

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush() { }

        public override long Length
        {
            get { throw new NotSupportedException(); }
        }

        public override long Position
        {
            get { throw new NotSupportedException(); }
            set { throw new NotSupportedException(); }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotSupportedException();
        }

        public override void SetLength(long value)
        {
            throw new NotSupportedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotSupportedException();
        }

        #endregion
    }

    public class Program
    {
        public static void Main(string[] args)
        {
            Test();
            Console.ReadLine();
        }

        public static async void Test()
        {
            var stream = new LongLivedTextStream();
            var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

            var t = Task.Run(() =>
                {
                    stream.Inject("<root>");
                    Thread.Sleep(2000);
                    stream.Inject("value");
                    Thread.Sleep(2000);
                    stream.Inject("</root>");
                    Thread.Sleep(2000);

                    reader.Dispose(); // InvalidOperationException: "An asynchronous operation is already in progress."

                    Console.WriteLine("Disposed");
                });

            while (await reader.ReadAsync())
            {
                bool kill = false;

                switch (reader.NodeType)
                {
                    case XmlNodeType.Element:
                        Console.WriteLine("Start: " + reader.LocalName);
                        break;

                    case XmlNodeType.EndElement:
                        Console.WriteLine("End:   " + reader.LocalName);
                        //kill = true; // I could use a particular EndElement as a signal to not try another read
                        break;

                    case XmlNodeType.Text:
                        Console.WriteLine("Text:  " + await reader.GetValueAsync());
                        break;
                }

                if (kill) { break; }
            }
        }
    }
}

编辑

这个例子使用了一个实际的 NetworkStream 并表明如果我 Close()Dispose() 底层流的 ReadAsync()XmlReader 的调用没有按预期返回 false,而是继续阻塞.

This example uses an actual NetworkStream and shows that if I Close() or Dispose() of the underlying stream the ReadAsync() call on XmlReader does not return false as hoped, instead it continues to block.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace Example
{
    public class Program
    {
        public static void Main(string[] args)
        {
            NetworkStream stream = null;

            var endpoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 50000);                                   

            var serverIsUp = new ManualResetEvent(false);
            var doneWriting = new ManualResetEvent(false);

            var t1 = Task.Run(() =>
            {
                var server = new TcpListener(endpoint);
                server.Start();

                serverIsUp.Set();

                var client = server.AcceptTcpClient();

                var writer = new StreamWriter(client.GetStream());

                writer.Write("<root>"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("value"); writer.Flush();
                Thread.Sleep(2000);
                writer.Write("</root>"); writer.Flush();
                Thread.Sleep(2000);

                doneWriting.Set();
            });

            var t2 = Task.Run(() =>
                {
                    doneWriting.WaitOne();

                    stream.Dispose();

                    Console.WriteLine("Disposed of Stream");
                });

            var t3 = Task.Run(async () =>
            {
                serverIsUp.WaitOne();                

                var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
                socket.Connect(endpoint);

                stream = new NetworkStream(socket, true);

                var reader = XmlReader.Create(stream, new XmlReaderSettings() { Async = true });

                bool val;
                while (val = await reader.ReadAsync())
                {
                    bool kill = false;

                    switch (reader.NodeType)
                    {
                        case XmlNodeType.Element:
                            Console.WriteLine("Start: " + reader.LocalName);
                            break;

                        case XmlNodeType.EndElement:
                            Console.WriteLine("End:   " + reader.LocalName);
                            //kill = true; // I could use a particular EndElement as a signal to not try another read
                            break;

                        case XmlNodeType.Text:
                            Console.WriteLine("Text:  " + await reader.GetValueAsync());
                            break;
                    }

                    if (kill) { break; }
                }

                // Ideally once the underlying stream is closed, ReadAsync() would return false
                // we would get here and could safely dispose the reader, but that's not the case
                // ReadAsync() continues to block
                reader.Dispose();
                Console.WriteLine("Disposed of Reader");
            });

            Console.ReadLine();
        }
    }
}

推荐答案

尝试将手动 注入解析器.为此,您可能需要 NetworkStream 和解析器之间的适配器类,它将所有传入数据传递给解析器,但添加另一种方法来注入 </stream:stream>.当您调用该方法时,您需要注意不要在另一个节的中间,也许是通过将状态保持在解析器的输出端.

Try injecting a manual </stream:stream> into the parser. To do this, you may need an adapter class between the NetworkStream and the parser, which passes all of the incoming data to the parser but adds another method to inject the </stream:stream>. You'll need to be careful that you're not in the middle of another stanza when you call that method, perhaps by keeping state on the output side of the parser.

这篇关于使用挂起的异步读取处理 XmlReader的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆