两个线程,一个NetworkStream:这个线程安全有效地读写吗? [英] Two threads, one NetworkStream: is this thread safely and efficiently reading and writing?

查看:63
本文介绍了两个线程,一个NetworkStream:这个线程安全有效地读写吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述


我的程序首先设置一个ActionManager,它管理一个逐个完成作业的Action队列,在完成它们时从队列中删除已完成的作业。 ActionManager将在自己的线程中运行。

My program starts by setting up an ActionManager, which manages an Action queue that completes jobs one by one, removing completed jobs from its queue as it completes them. The ActionManager will run in its own thread.


然后,创建一个NetworkStreamManager,它将持续读取消息。读取一些数据后,NetworkStreamManager将触发一个事件DataArrival,它将作业传递给ActionManager的队列以处理接收的数据。

Then, a NetworkStreamManager is created, which will continuously read messages. Once some data is read, the NetworkStreamManager will fire an event DataArrival, which passes a job to the ActionManager's queue to handle the received data.


Main将输入一个无限循环,它可以工作,但也  直接 在这个主线程中调用
networkStreamManager.SendData()。

Main will then enter an infinite loop in which it does work, but also directly calls networkStreamManager.SendData() in this main thread.


我还有一些问题,除了这是否是一个正确编写的实现:

I have a few questions in addition to whether this a properly written implementation:



  1. Is it thread safe for the main thread and the ActionManager thread to SendData? I added a lock so that BeginWrite() could not be called concurrently.

>
我想调用SendData()来阻止线程继续执行程序,直到SendData完成。但是,因为我不希望NetworkStreamManager停止读取数据,所以我使用异步BeginWrite()在同步Write()上,因为
似乎Write()阻塞整个调用线程。这是一个正确的假设和设计选择吗?

I would like calling SendData() to stop the thread from progressing down the program until SendData is complete. However, because I did not want NetworkStreamManager to stop reading data, I used the asynchronous BeginWrite() over the synchronous Write(), since it seems that Write() blocks the entire calling thread. Is this a correct assumption and design choice?



请随意指出我的实施中的任何其他问题!

Please feel free to point out any other issues with my implementation too!


MainClass.cs

public class MainClass
{
	private ActionManager actionManager;
	private TcpClient tcp;
	private NetworkStreamManager streamManager;
	
	static void Main(string[] args)
	{
		// ActionManager's constructor will Start a new Thread
		actionManager = new ActionManager();
		
		tcp = new TcpClient();
		tcp.Connect(someIPaddr, somePort);
		
		streamManager = new NetworkStreamManager(tcp);
		streamManager.DataAvailable += DataArrival;
		
		while (true)
		{
			// ... Do stuff
			
			// Directly SendData without placing an action
			// in ActionManager's queue
			streamManager.SendData(Encoder.ASCII.GetBytes("KeepAlive"));
			
			// We cannot do cool stuff until SendData has completed
			// ... Do some cool stuff
		}
	}
	
	private void DataArrival(DataAvailableEventArgs e)
	{
		actionManager.AddAction(() =>
		{
			// ... Do stuff with the received data
			
			// Respond acknowledging completion
			streamManager.SendData(Encoder.ASCII.GetBytes("Complete"));
		});
	}
}




NetworkStreamManager.cs




NetworkStreamManager.cs

public class NetworkStreamManager
{
	public event DataAvailableEventHandler DataAvailable;

	private readonly NetworkStream networkStream;
	private byte[] data = new byte[256];

	private readonly object writeLock = new object();

	public NetworkStreamManager(NetworkStream networkStream)
	{
		if (networkStream == null)
		{
			throw new ArgumentNullException("NetworkStream is null");
		}
		this.networkStream = networkStream;
		WaitForNext();
	}

	protected void OnDataAvailable(DataAvailableEventArgs e)
	{
		var handler = DataAvailable;
		if (handler != null)
		{
			handler(e);
		}
	}

	protected void WaitForNext()
	{
		networkStream.BeginRead(
			data,
			0,
			data.Length,
			new AsyncCallback(ReadCallback),
			networkStream);
	}

	private void ReadCallback(IAsyncResult ar)
	{
		NetworkStream myNetworkStream = (NetworkStream) ar.AsyncState;
		int bytesRead = myNetworkStream.EndRead(ar);
		
		// Raise an event that contains the buffer
		OnDataAvailable(new DataAvailableEventArgs(
			data,
			bytesRead));
		WaitForNext();
	}

	public void SendData(byte[] data)
	{
		lock (writeLock)
		{
			ManualResetEvent writeComplete = new ManualResetEvent(false);
			networkStream.BeginWrite(
				data,
				0,
				data.Length,
				SendDataCallback,
				new TcpWriteState(networkStream, writeComplete));

			// Block until writeComplete has finished, and time
			// out after 1 second
			if (!writeComplete.WaitOne(1000))
			{
				// Handle write timeout
			}
		}
	}

	private void SendDataCallback(IAsyncResult ar)
	{
		TcpWriteState state = (TcpWriteState) ar.AsyncState;
		NetworkStream myNetworkStream = state.networkStream;
		ManualResetEvent writeComplete = state.manualResetEvent;

		myNetworkStream.EndWrite(ar);
		writeComplete.Set();
	}
}









推荐答案

通常,让多个线程同时读/写同一个流是不安全的。最大的问题是将它们同步起来非常困难。大多数网络协议都是发送/接收循环。如果你有多个线程读取数据
而其他线程正在编写,则无法轻易保证此行为。 示例

In general it is not safe to have multiple threads reading/writing the same stream at the same time. The biggest issue is that it would be very hard to sync them up. Most network protocols are a send/receive loop. If you have multiple threads reading data while other threads are writing you cannot easily guarantee this behavior.  Example

- 客户端开始发送消息但尚未发送所有消息

- 引发DataAvailable并发送一部分消息/>
- 操作被推入队列,处理可能会或可能不会开始。
- 客户端完成发送消息

- DataAvailable再次被提升,其余的消息

- 行动被推入你的队列,处理可能会或可能不会开始。
- 更多数据来自其他人

- The client starts sending a message but hasn't sent it all yet
- DataAvailable is raised with a portion of the message that was sent
- Action is pushed into your queue and processing may or may not begin
- Client finishes sending message
- DataAvailable is raised again with the rest of the message
- Action is pushed into your queue and processing may or may not begin
- More data arrives from somebody else

在操作被推入队列后的任何时刻,您的逻辑可能会运行,可能会也可能没有所有数据。此外,它会发送完整的回复。客户只需要1即可获得。在此之后的某个时间运行另一个操作并且
检索到此时尚未读取的数据,然后将完成发送给不期望它的客户端。同时,排队的第三个动作现在读取已经读取的数据并可能产生错误。它
也可能向不期望它的客户端发送完整的消息。

At any point after the action gets pushed into your queue your logic may run which may or may no have all the data. Additionally it'll send a complete response. The client, which expects only 1 will get it. Sometime after that the other action runs and it retrieves the data that has not been read to this point and then sends a completion to a client that isn't expecting it. Meanwhile the 3rd action that would have gotten queued now reads data that has already been read and probably generates an error. It'll also potentially send a complete message to a client that isn't expecting it.

通常,高性能TCP服务器为每个唯一客户端分配一个单独的线程。所有处理都由该客户端的单个线程完成。这消除了这些问题,因为线程负责所有工作并且可以确保
发送/接收管道。如果客户端经常连接和断开连接(如在HTTP中),则可以使用线程池来减少开销。一旦请求完成,就可以返回线程。

In general, high performance TCP servers allocate a separate thread to each unique client. All the processing is done by a single thread for that client. This eliminates these kinds of issues as the thread is responsible for all the work and can ensure the send/receive pipeline. If clients connect and disconnect frequently (like in HTTP) then a pool of threads can be used to cut down the overhead. Once a request has been completed the thread can be returned.

如果你真的需要实现异步TCP,那么你将不得不在客户端和服务器上改变你的方法。服务器可以创建一个线程来开始处理,但它必须发回一个确认包,让客户知道
已收到数据。然后,客户端可以在等待响应时继续。要将响应返回给客户端,它确实需要拥有自己的TCP服务器以允许通知通过。由于TCP通常使用的固有发送/接收协议
,您不能简单地在原始套接字上发送回数据包,因为客户端可能没有监听它(同步),实际上可能正在尝试发送更多数据到服务器。

If you really need to implement async TCP then you're going to have to change your approach on both the client and the server. The server can create a thread to start processing but it'll have to send back a confirmation packet to let the client know it has received the data. The client can then continue on while it waits for the response. To get the response back to the client it would really need to have its own TCP server to allow the notification to come through. Because of the inherent send/receive protocol that TCP generally uses you cannot simply send back a packet on the original socket as the client may not be listening for it (sync) and may, in fact, be trying to send more data to the server.


这篇关于两个线程,一个NetworkStream:这个线程安全有效地读写吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆