谈及异步并行接口不仅并发使用TPL非常密集的应用 [英] Turning async socket Parallel and not only Concurrent in very intensive application using TPL

查看:181
本文介绍了谈及异步并行接口不仅并发使用TPL非常密集的应用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在写一个使用套接字的应用程序,这将是非常密集的话,我真的需要利用一切的核心,我们有我们的大型服务器。我看到的问题(<一个href=\"http://stackoverflow.com/questions/4020607/how-to-using-threadpool-to-run-socket-thread-parallel\">how使用线程池来运行套接字线程并行?)在这里有计算器答案只有一个指向该的 MSDN样品

I'm writing an application that uses Socket and it will be very intensive then I really need use every core we have in our big server. I see the question ( how to using ThreadPool to run socket thread parallel ? ) here in stackoverflow there is only one answer that point to this MSDN Sample.

但我认为这点不仅是如何使它的并行而非并行,这里有人问的如何CPU密集型正在打开一个套接字和它的外观是非常密集,这里有人告诉其不要帮助<一个href=\"http://stackoverflow.com/questions/5018897/tpl-taskfactory-fromasync-vs-tasks-with-blocking-methods/5834381#5834381\">TPL TaskFactory.FromAsync VS与封闭方法任务,有人教如何在这里做蒙山TaskFactory.FromAsync(的是否有包装现有的BeginXXX / EndXXX异步模式
方法分为异步任务?
)。

But I think it point only how to make it Concurrent and not Parallel, here is someone asking How cpu intensive is opening a socket and its looks be very intensive, someone here tell its dont help TPL TaskFactory.FromAsync vs Tasks with blocking methods and someone teach how to do it here whith TaskFactory.FromAsync (Is there a pattern for wrapping existing BeginXXX/EndXXX async methods into async tasks?).

我怎么能保持套接字操作并行和高性能,如果交易蒙山插座
像断线,连一半插座和消息边界问题是在正常异步方式头疼。如何处理这如果放在一起TPL和任务。

How can I keep socket operations parallel and performant and if deal whith socket problems like disconnections, half connected sockets and message boundaries are a headache in normal async way. How to deal with it if its put together TPL and Task.

推荐答案

看到:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace skttool
{
    public class StateObject
    {
        public Socket workSocket = null;
        public const int BufferSize = 1024;
        public byte[] buffer = new byte[BufferSize];
        public int bytesRead = 0;
        public StringBuilder sb = new StringBuilder();
    }

    public class tool
    {
        //-------------------------------------------------
        private ManualResetEvent evtConnectionDone = new ManualResetEvent(false);
        private Socket skttool = null;
        private bool running = false;
        private StateObject state = null;
        //-------------------------------------------------
        toolConfig _cfg;
        public tool(toolConfig cfg)
        {
            _cfg = cfg;
        }
        //-------------------------------------------------
        public void socketListeningSet()
        {
            IPEndPoint localEndPoint;
            Socket skttool;
            byte[] bytes = new Byte[1024];
            localEndPoint = new IPEndPoint(IPAddress.Any, _cfg.addressPort);
            skttool = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            skttool.Bind(localEndPoint);
            skttool.Listen(_cfg.maxQtdSockets);
        }
        //-------------------------------------------------
        public void start()
        {
            running = true;
            Task T1 = Task.Factory.StartNew(socketListeningSet);
            T1.ContinueWith(prev =>
            {
                while (running)
                {
                    evtConnectionDone.Reset();
                    Task<Socket> accepetChunk = Task<Socket>.Factory.FromAsync(
                                                                       skttool.BeginAccept,
                                                                       skttool.EndAccept,
                                                                       accept,
                                                                       skttool,
                                                                       TaskCreationOptions.AttachedToParent);
                    accepetChunk.ContinueWith(accept, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
                    evtConnectionDone.WaitOne();
                }
            });
        }
        //-------------------------------------------------
        void accept(Task<Socket> accepetChunk)
        {
            state = new StateObject();
            evtConnectionDone.Set();
            state.workSocket = accepetChunk.Result;
            Task<int> readChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginReceive,
                                                       state.workSocket.EndReceive,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            readChunk.ContinueWith(read, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void read(Task<int> readChunk)
        {
            state.bytesRead += readChunk.Result;
            if (readChunk.Result > 0 && state.bytesRead < state.buffer.Length)
            {
                read();
                return;
            }
            _data = doTask(_data);
            Task<int> sendChunk = Task<int>.Factory.FromAsync(
                                                       state.workSocket.BeginSend,
                                                       state.workSocket.EndSend,
                                                       state.buffer,
                                                       state.bytesRead,
                                                       state.buffer.Length - state.bytesRead,
                                                       null,
                                                       TaskCreationOptions.AttachedToParent);
            sendChunk.ContinueWith(send, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent);
        }
        //-------------------------------------------------
        void send(Task<int> readChunk)
        {
            state.workSocket.Shutdown(SocketShutdown.Both);
            state.workSocket.Close();
        }
        //-------------------------------------------------
        byte[] doTask(byte[] data)
        {
            return Array.Reverse(data);
        }
        //-------------------------------------------------
    }
}

这篇关于谈及异步并行接口不仅并发使用TPL非常密集的应用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆