C#全双工异步命名管道.NET [英] c# Full Duplex Asynchronous Named Pipes .NET

查看:69
本文介绍了C#全双工异步命名管道.NET的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在两台不同的计算机(仅)上实现全双工客户端-服务器通信方案,其中每个端点(客户端或服务器)可以随时异步(非阻塞管道)发送内容,另一端将拾起并阅读.

I am trying to achieve a full-duplex client-server communication scheme, on 2 different machines (only), where each end-point (client or server) can send stuff at any time, asynchronously (non-blocking pipe), and the other end will pick it up and read it.

我不希望答案涉及到除命名管道之外的其他任何技术,我知道其他技术,但我想回答这个特定问题. (我已经在不同的论坛上多次问过这个问题,并且我不断看到建议使用其他技术的回复.我认为这很粗鲁吗?)

I don't want answers referring me to any other technology besides named pipes, I know about the other technologies, but I want an answer to this particular question. (I've seen this question posted so many times on different forums, and I keep seeing responses advising to use some other technology. I think this borders on rude?)

我已经读到命名管道只能是单向的,否则它们会锁定,但是我猜这可能是错误的.我认为管道是基于套接字的,而且我无法想象基础套接字只会是单向的.

I've read that Named Pipes have to be one-direction only or they lock up, but I'm guessing that's probably wrong. I think pipes are socket-based, and I can't imagine the underlying socket would be one-way only.

这个问题的任何答案都需要解决这些问题,才能真正有用:

Any answers to this question need to address these issues for it to be really useful:

  1. 答案需要解决异步管道问题,我不能使用同步解决方案.
  2. 答案需要证明或允许管道保持打开状态.我厌倦了阅读打开管道,传输字符串然后立即关闭管道的示例.我想要一个答案,假设管道保持打开状态,并在任意时间传输大量垃圾,并不断重复.没有挂起.
  3. 基于C#的解决方案

对于声音的苛刻和流鼻涕感到很抱歉,但是经过数天的互联网搜索之后,我仍然没有找到一个很好的例子,并且我不想使用WFC.我敢肯定,如果您知道此答案的细节并很好地回答,那么该主题将是未来时代的真正赢家.如果知道了,我会自己张贴答案.

I'm sorry to sound demanding and snotty, but after days of scouring the internet I still haven't found a good example, and I don't want to use WFC. If you know the details of this answer and answer it well, this topic will be a real winner for ages to come, I'm sure. I will post the answer myself if I figure it out.

如果您要编写并说您需要使用两个管道",请说明为什么,以及您如何知道这是正确的,因为我对此一无所知就是这种情况.

If you're about to write and say "You need to use two pipes", please explain why, and how you know this is true, because nothing I've read about this explains why this is the case.

谢谢!

推荐答案

您不必使用两个管道.我在网上发现了很多答案,指出您需要使用两个管道.我挖了一下,熬夜,反复尝试,然后想出方法,这非常简单,但是您必须正确处理所有事情(尤其是按正确的调用顺序进行操作),否则将无法正常工作.另一个技巧是始终确保您有未完成的读取调用,否则它将锁定.在您知道有人正在读书之前,不要写东西.除非您先设置了事件,否则不要发起读取呼叫.那样的东西.

You don't have to use two pipes. I found a LOT of answers on the net that state you need to use two pipes. I dug around, stayed up all night, tried and tried again, and figured out how to do it, it's super simple, but you gotta get everything right (especially get things in the right calling order), or it just won't work. Another trick is to always ensure you have a read call outstanding, or it will lock up too. Don't write before you know somebody is reading. Don't start a read call unless you have the event set up first. That kind of thing.

这是我正在使用的管道类.它可能不够强大,无法处理管道错误,关闭和溢出.

here's the pipe class I'm using. It's probably not robust enough to deal with pipe errors, closures, and overflows.

好吧,我不知道这里出了什么问题,但是格式略有偏离! vvvv

Okay I have no idea what's wrong here, but the formatting is slightly off! vvvv

namespace Squall
{
    public interface PipeSender
    {
        Task SendCommandAsync(PipeCommandPlusString pCmd);
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/
    public class ClientPipe : BasicPipe
    {
        NamedPipeClientStream m_pPipe;

        public ClientPipe(string szServerName, string szPipeName)
            : base("Client")
        {
            m_szPipeName = szPipeName; // debugging
            m_pPipe = new NamedPipeClientStream(szServerName, szPipeName, PipeDirection.InOut, PipeOptions.Asynchronous);
            base.SetPipeStream(m_pPipe); // inform base class what to read/write from
        }

        public void Connect()
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + " connecting to server");
            m_pPipe.Connect(); // doesn't seem to be an async method for this routine. just a timeout.
            StartReadingAsync();
        }

        // the client's pipe index is always 0
        internal override int PipeId() { return 0; }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/
    public class ServerPipe : BasicPipe
    {
        public event EventHandler<EventArgs> GotConnectionEvent;

        NamedPipeServerStream m_pPipe;
        int m_nPipeId;

        public ServerPipe(string szPipeName, int nPipeId)
            : base("Server")
        {
            m_szPipeName = szPipeName;
            m_nPipeId = nPipeId;
            m_pPipe = new NamedPipeServerStream(
                szPipeName,
                PipeDirection.InOut,
                NamedPipeServerStream.MaxAllowedServerInstances,
                PipeTransmissionMode.Message,
                PipeOptions.Asynchronous);
            base.SetPipeStream(m_pPipe);
            m_pPipe.BeginWaitForConnection(new AsyncCallback(StaticGotPipeConnection), this);
        }

        static void StaticGotPipeConnection(IAsyncResult pAsyncResult)
        {
            ServerPipe pThis = pAsyncResult.AsyncState as ServerPipe;
            pThis.GotPipeConnection(pAsyncResult);
        }

        void GotPipeConnection(IAsyncResult pAsyncResult)
        {
            m_pPipe.EndWaitForConnection(pAsyncResult);

            Debug.WriteLine("Server Pipe " + m_szPipeName + " got a connection");

            if (GotConnectionEvent != null)
            {
                GotConnectionEvent(this, new EventArgs());
            }

            // lodge the first read request to get us going
            //
            StartReadingAsync();
        }

        internal override int PipeId() { return m_nPipeId; }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/

    public abstract class BasicPipe : PipeSender
    {
        public static int MaxLen = 1024 * 1024; // why not
        protected string m_szPipeName;
        protected string m_szDebugPipeName;

        public event EventHandler<PipeEventArgs> ReadDataEvent;
        public event EventHandler<EventArgs> PipeClosedEvent;

        protected byte[] m_pPipeBuffer = new byte[BasicPipe.MaxLen];

        PipeStream m_pPipeStream;

        public BasicPipe(string szDebugPipeName)
        {
            m_szDebugPipeName = szDebugPipeName;
        }

        protected void SetPipeStream(PipeStream p)
        {
            m_pPipeStream = p;
        }

        protected string FullPipeNameDebug()
        {
            return m_szDebugPipeName + "-" + m_szPipeName;
        }

        internal abstract int PipeId();

        public void Close()
        {
            m_pPipeStream.WaitForPipeDrain();
            m_pPipeStream.Close();
            m_pPipeStream.Dispose();
            m_pPipeStream = null;
        }

        // called when Server pipe gets a connection, or when Client pipe is created
        public void StartReadingAsync()
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + " calling ReadAsync");

            // okay we're connected, now immediately listen for incoming buffers
            //
            byte[] pBuffer = new byte[MaxLen];
            m_pPipeStream.ReadAsync(pBuffer, 0, MaxLen).ContinueWith(t =>
            {
                Debug.WriteLine("Pipe " + FullPipeNameDebug() + " finished a read request");

                int ReadLen = t.Result;
                if (ReadLen == 0)
                {
                    Debug.WriteLine("Got a null read length, remote pipe was closed");
                    if (PipeClosedEvent != null)
                    {
                        PipeClosedEvent(this, new EventArgs());
                    }
                    return;
                }

                if (ReadDataEvent != null)
                {
                    ReadDataEvent(this, new PipeEventArgs(pBuffer, ReadLen));
                }
                else
                {
                    Debug.Assert(false, "something happened");
                }

                // lodge ANOTHER read request
                //
                StartReadingAsync();

            });
        }

        protected Task WriteByteArray(byte[] pBytes)
        {
            // this will start writing, but does it copy the memory before returning?
            return m_pPipeStream.WriteAsync(pBytes, 0, pBytes.Length);
        }

        public Task SendCommandAsync(PipeCommandPlusString pCmd)
        {
            Debug.WriteLine("Pipe " + FullPipeNameDebug() + ", writing " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());
            string szSerializedCmd = JsonConvert.SerializeObject(pCmd);
            byte[] pSerializedCmd = Misc.StringToBytes(szSerializedCmd);
            Task t = WriteByteArray(pSerializedCmd);
            return t;
        }
    }

    /******************************************************************************
     * 
     * 
     * 
     * 
     ******************************************************************************/

    public class PipeEventArgs
    {
        public byte[] m_pData;
        public int m_nDataLen;

        public PipeEventArgs(byte[] pData, int nDataLen)
        {
            // is this a copy, or an alias copy? I can't remember right now.
            m_pData = pData;
            m_nDataLen = nDataLen;
        }
    }

    /******************************************************************************
     * if we're just going to send a string back and forth, then we can use this
     * class. It it allows us to get the bytes as a string. sort of silly.
     ******************************************************************************/

    [Serializable]
    public class PipeCommandPlusString
    {
        public string m_szCommand;  // must be public to be serialized
        public string m_szString;   // ditto

        public PipeCommandPlusString(string sz, string szString)
        {
            m_szCommand = sz;
            m_szString = szString;
        }

        public string GetCommand()
        {
            return m_szCommand;
        }

        public string GetTransmittedString()
        {
            return m_szString;
        }
    }
}

这是我的管道测试,在一个进程上运行.我也检查了它在两个进程上的运行情况

and here is my pipe test, running on one process. It runs on two processes too, I checked

namespace NamedPipeTest
{
    public partial class Form1 : Form
    {
        SynchronizationContext _context;
        Thread m_pThread = null;
        volatile bool m_bDieThreadDie;
        ServerPipe m_pServerPipe;
        ClientPipe m_pClientPipe;

        public Form1()
        {
            InitializeComponent();
        }

        private void Form1_Load(object sender, EventArgs e)
        {
            _context = SynchronizationContext.Current;

            m_pServerPipe = new ServerPipe("SQUALL_PIPE", 0);
            m_pServerPipe.ReadDataEvent += M_pServerPipe_ReadDataEvent;
            m_pServerPipe.PipeClosedEvent += M_pServerPipe_PipeClosedEvent;

            // m_pThread = new Thread(StaticThreadProc);
            // m_pThread.Start( this );
        }

        private void M_pServerPipe_PipeClosedEvent(object sender, EventArgs e)
        {
            Debug.WriteLine("Server: Pipe was closed, shutting down");

            // have to post this on the main thread
            _context.Post(delegate
            {
                Close();
            }, null);
        }

        private void M_pServerPipe_ReadDataEvent(object sender, PipeEventArgs e)
        {
            // this gets called on an anonymous thread

            byte[] pBytes = e.m_pData;
            string szBytes = Misc.BytesToString(pBytes, e.m_pData.Length);
            PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
            string szValue = pCmd.GetTransmittedString();

            if (szValue == "CONNECT")
            {
                Debug.WriteLine("Got command from client: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString() + ", writing command back to client");
                PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("SERVER", "CONNECTED");
                // fire off an async write
                Task t = m_pServerPipe.SendCommandAsync(pCmdToSend);
            }
        }

        static void StaticThreadProc(Object o)
        {
            Form1 pThis = o as Form1;
            pThis.ThreadProc();
        }

        void ThreadProc()
        {
            m_pClientPipe = new ClientPipe(".", "SQUALL_PIPE");
            m_pClientPipe.ReadDataEvent += PClientPipe_ReadDataEvent;
            m_pClientPipe.PipeClosedEvent += M_pClientPipe_PipeClosedEvent;
            m_pClientPipe.Connect();

            PipeCommandPlusString pCmd = new PipeCommandPlusString("CLIENT", "CONNECT");
            int Counter = 1;
            while (Counter++ < 10)
            {
                Debug.WriteLine("Counter = " + Counter);
                m_pClientPipe.SendCommandAsync(pCmd);
                Thread.Sleep(3000);
            }

            while (!m_bDieThreadDie)
            {
                Thread.Sleep(1000);
            }

            m_pClientPipe.ReadDataEvent -= PClientPipe_ReadDataEvent;
            m_pClientPipe.PipeClosedEvent -= M_pClientPipe_PipeClosedEvent;
            m_pClientPipe.Close();
            m_pClientPipe = null;
        }

        private void M_pClientPipe_PipeClosedEvent(object sender, EventArgs e)
        {
            // wait around for server to shut us down
        }

        private void PClientPipe_ReadDataEvent(object sender, PipeEventArgs e)
        {
            byte[] pBytes = e.m_pData;
            string szBytes = Misc.BytesToString(pBytes, e.m_nDataLen);
            PipeCommandPlusString pCmd = JsonConvert.DeserializeObject<PipeCommandPlusString>(szBytes);
            string szValue = pCmd.GetTransmittedString();

            Debug.WriteLine("Got command from server: " + pCmd.GetCommand() + "-" + pCmd.GetTransmittedString());

            if (szValue == "CONNECTED")
            {
                PipeCommandPlusString pCmdToSend = new PipeCommandPlusString("CLIENT", "DATA");
                m_pClientPipe.SendCommandAsync(pCmdToSend);
            }
        }

        private void Form1_FormClosing(object sender, FormClosingEventArgs e)
        {
            if (m_pThread != null)
            {
                m_bDieThreadDie = true;
                m_pThread.Join();
                m_bDieThreadDie = false;
            }

            m_pServerPipe.ReadDataEvent -= M_pServerPipe_ReadDataEvent;
            m_pServerPipe.PipeClosedEvent -= M_pServerPipe_PipeClosedEvent;
            m_pServerPipe.Close();
            m_pServerPipe = null;

        }
    }
}

这篇关于C#全双工异步命名管道.NET的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆