多个线程到同一个 TCP 套接字 [英] Multiple Threads to the same TCP Socket

查看:56
本文介绍了多个线程到同一个 TCP 套接字的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 .NET 开发人员,并且是 Socket 编程的新手.

I am a .NET Developer and am new to Socket Programming.

我编写了一个程序,它使用 .NET 套接字库使用 TCP 套接字向客户端发送一些数据.

I wrote a program which sends some data to the client using a TCP Socket using .NET Socket Library.

客户端需要每 40 秒发送一次自定义保持活动消息以保持连接活动.

The client requires a custom Keep Alive Message every 40 secs to keep the connection alive.

所以,我编写了与客户端建立连接的主程序.在这个主程序中,我创建了一个线程并传递了之前创建的 Socket 类的实例.该线程负责向客户端发送保活消息,而主线程负责发送数据.

So, I wrote main program that establishes a connection with the client. With-in this main program, I create a thread and pass the instance of Socket Class that was created earlier. This thread is responsible for sending the keep alive messages to the client while the main thread is responsible for sending the data.

一切都很好.但是如果由于某种原因套接字连接超时,程序将永远无法恢复?我将退出线程和建立新连接的逻辑放在一起,但它总是会给出错误 - '与主机的连接已中止'或类似的东西.

It all works good. But if for some reason socket connection is time out, the program will never recover? I put the logic for both the threads to exit and new connnection to be established but it will always give an error - ' The connection with the host is aborted' or something similar.

我做错了吗?

我需要将两个线程连接到同一个套接字.一个线程负责发送数据,另一个线程负责发送保活消息.最好的方法应该是什么?

I need to have two threads connected to the same socket. One thread is responsible for sending the data and other is responsible for sending the keep alive messages. What should be the best approach for this?

不,我不想使用同一个套接字.我摆脱了 for 循环和 clntSock.close()...

No, I am not trying to use the same socket. I break away from the for loop and clntSock.close()...

代码如下:

我有调用 handleClient 的 mainThread.handleClient 创建另一个线程.

I have mainThread that calls handleClient. handleClient Creates another thread.

class DispatchLoop
{
    ILogger logger;
    TcpListener listener;
    IProtocolFactory protoFactory;

    public DispatchLoop(TcpListener listener, ILogger logger, IProtocolFactory protoFactory)
    {
        this.logger = logger;
        this.listener = listener;
        this.protoFactory = protoFactory;
    }

    public void mainThread()
    {
        // Run forever, accepting and handling each connection
        for (; ; )
        {
            try
            {
                Socket clntSock = listener.AcceptSocket(); // Block waiting for connection
                PoolDispatcher._stopper.Reset();
                clntSock.ReceiveTimeout = 10000;
                IProtocol protocol = protoFactory.createProtocol(clntSock, logger);
                protocol.handleClient();
            }
            catch (SocketException se)
            {
                logger.writeEntry("(Run Dispatcher) Exception = " + se.Message);
            }
        }
    }
}

    public void handleClient()
    {
        entry.Add("Main Thread Entered : Client address and Port = " + clntSock.RemoteEndPoint + ", Thread Number = " + Thread.CurrentThread.GetHashCode());

        //Kick Starting Keep Alive Thread
        KeepAlive ka = new KeepAlive(clntSock, logger);
        Thread thread = new Thread(new ThreadStart(ka.SendKeepAlive));
        thread.Start();
        try
        {
            int recvMsgSize; // Size of received message
            byte[] rcvBuffer = new byte[BUFSIZE]; // Receive buffer
            byte[] messageBuffer = new byte[1024];
            XDocument _messageXDoc;
            FlightInfoExtended _flightInfoExtended;
            try
            {
                LogEntry(entry);
                for (; ; )
                {
                    try
                    {
                        //Read from the Queue 
                        var _queue = new IBMQueue();
                        var message = _queue.ReceiveMessage();

                        if (message.Length > 0)
                        {
                            entry.Add("Sending the GOS Message to the client : " + message);
                            messageBuffer = Encoding.ASCII.GetBytes(message);

                            if (clntSock.Connected)
                            {
                                clntSock.Send(messageBuffer, 0, messageBuffer.Length, SocketFlags.None);
                                recvMsgSize = clntSock.Receive(rcvBuffer, 0, rcvBuffer.Length, SocketFlags.None);
                                SaveGOSMessage(_auditMessage);
                            }
                            else
                            {
                                PoolDispatcher._stopper.Set();
                                LogFailureStatus("No Socket Connection");
                                Thread.Sleep(30000);
                                break;
                            }
                        }
                    }
                    catch (SocketException se)
                    {
                        PoolDispatcher._stopper.Set();
                        LogFailureStatus(se.Message);
                        Thread.Sleep(30000);
                        break;
                    }
                    catch (Exception e)
                    {
                    }
                    LogEntry(entry);
                }
            }
            catch (Exception se)
            {
                entry.Add(String.Format("{0}: {1}", se.Source, se.Message));
            }
        }
        catch (Exception se)
        {
            entry.Add(String.Format("{0}: {1}", se.Source, se.Message));
        }

        clntSock.Close();

        logger.writeEntry(entry);
    }



public class KeepAlive
{
    ArrayList entry;
    private ILogger logger;
    private Socket clntSock;
    public const int BUFSIZE = 1024;

    public KeepAlive(Socket clntSock, ILogger logger)
    {
        this.logger = logger;
        this.clntSock = clntSock;
        entry = new ArrayList();
    }

    void LogEntry(ArrayList _entry)
    {
        logger.writeEntry(_entry);
        entry.Clear();
    }

    public void SendKeepAlive()
    {
        entry.Add("Keep Alive Thread Entered : Client address and Port = " + clntSock.RemoteEndPoint + ", Thread Number = " + Thread.CurrentThread.GetHashCode());

        var message= "Some Keep Alive Message";

        try
        {
            byte[] messageBuffer = new byte[1024];
            LogEntry(entry);
            for (; ; )
            {
                //Check if main thread died
                if ( PoolDispatcher._stopper.WaitOne(100, false))  
                {                                 
                    break;
                }

                if (clntSock.Connected)
                {
                    entry.Add("Sending the Keep Alive Message... " + message);
                    messageBuffer = Encoding.ASCII.GetBytes(message);
                    clntSock.Send(messageBuffer, 0, messageBuffer.Length, SocketFlags.None);
                }
                else
                {
                    entry.Add("Socket Connection is not active. Keep Alive not sent");
                    break;
                }
                LogEntry(entry);
                Thread.Sleep(30000);
            }
        }
        catch (SocketException se)
        {
            entry.Add(String.Format("{0}: {1}", se.ErrorCode, se.Message));
        }
        catch (ObjectDisposedException ode)
        {
            entry.Add("Connection to the socket lost. Child Thread Aborted");
        }
        LogEntry(entry);
    }
}

推荐答案

它很容易实现,只需将服务器和客户端这两个模块集成在单线程中,并为每个实例生成读写线程,每个 Socket 都有 IP 地址(不是同一个端口)

Its easy to approach just integrate the two module .i.e server and Client in single thread and spawn read write thread for each instance,for each Socket with by IP address (not by same Port)

http://msdn.microsoft.com/en-us/library/system.net.sockets.tcpclient%28v=VS.90%29.aspx&http://msdn.microsoft.com/en-us/library/system.net.sockets.tcplistener%28v=VS.90%29.aspx

单独使用它效果很好.我完成了这件事,希望对你有帮助.

use this alone it works perfect. I completed this thing,Hope its helps u.

这篇关于多个线程到同一个 TCP 套接字的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆