Boost UDP套接字的性能-低吞吐量? [英] Performance of Boost UDP Sockets - Low throughput?

查看:138
本文介绍了Boost UDP套接字的性能-低吞吐量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我认为Boost ASIO UDP的性能有些问题-至少是这样.

我整理了一个简单的客户端/服务器应用程序来说明:

  1. 客户端开始向服务器缓慢发送大型(65K)UDP程序包
  2. 服务器在收到包裹后立即发送回ACK包裹
  3. 客户端会密切关注这些ACK包的传入速率-并更快地发送传出包-以加快速度.

(包裹丢失/重新发送等在示例代码中无关紧要.)

最初使用C#编写此代码时,客户端很快达到了NIC的限制-大约125兆字节/秒.当使用Boost用C ++重写相同的代码时-速度停止在2兆字节/秒以下.

客户端和服务器的

CPU均为1-2%.没有内存问题.在本地主机上运行服务器和客户端.运行Windows 10,VS 2013,Boost 1.60.

我尝试使用Boost的异步发送/接收方法,但这似乎无济于事. 如何提高Boost ASIO,UDP客户端应用程序的吞吐量

是否存在与从同一个boost套接字发送/接收的两个线程有​​关的问题? C ++套接字服务器-无法使CPU饱和

PS-我2个月前开始进行C ++编程-因此,这里某处可能确实存在一个愚蠢的错误..但是我无法确切地知道哪里.任何帮助/想法都将得到极大的应用.

客户:

 #pragma once
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/thread/v2/thread.hpp>
#include <boost/thread/detail/thread.hpp>
#include <chrono>
#include <boost/lexical_cast.hpp>

#define EXPOSE_IN_DLL __declspec(dllexport)

namespace dummy{

    EXPOSE_IN_DLL class DummyClient
    {
        public:

            // How fast the client is receiving acks - e.g. every 200 ms.
            double MillisecondsPerAck = 200; 

            //How fast the client should be sending packages (meaning - how long it should sleep between packages)
            long long MinimumOfMillisecondsToUsePerSentPackage = 200; 

            //How often the code should throttle (calculate new value for 'MinimumOfMillisecondsToUsePerSentPackage')
            int intervalMilliseconds = 500;

            //How much faster we should send than receive (how fast we should increase the speed)
            const double ThrottleAgressiveness = 0.7;

            //Counters
            int AcksSinceLastTime = 0;
            int AcksLastTime = 0;
            int PackagesSent = 0;
            int AcksReceived = 0;
            long long BytesSentAndAcked = 0;

            //Size of UDP Package to send. IP Layer (NIC) will break larger packages into smaller ones (MTU).
            static const int PacketSize = 65000; 

            std::shared_ptr<boost::asio::io_service> io_service;
            std::shared_ptr<boost::asio::ip::udp::socket> socket;
            std::shared_ptr<boost::asio::ip::udp::endpoint> udpEndPoint;

            EXPOSE_IN_DLL DummyClient()
            {
                boost::thread receiverThread(&DummyClient::SendPackages, this);
                receiverThread.detach();

                using namespace std::chrono;
                auto started = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

                while (true){

                    boost::this_thread::sleep_for(boost::chrono::milliseconds(100));

                    auto elapsedMilliseconds = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - started;

                    system("cls");
                    std::cout << "                  ############  UDP  CLIENT  ############   \n\n";
                    std::cout << "                             Packages sent: " << PackagesSent << "\n";
                    std::cout << "                             Acks received: " << AcksReceived << "\n";
                    std::cout << "                           Bytes delivered: " << BytesToSize(BytesSentAndAcked) << "\n";
                    std::cout << "---------------------------------------------------------------------------\n";

                    if (AcksReceived > 0 && PackagesSent > 0)
                        std::cout << "                          Acked percentage: " << 100.0 * AcksReceived / PackagesSent << "\n";

                    std::cout << "---------------------------------------------------------------------------\n";
                    std::cout << "                                 Bandwidth: " << BytesToSize(1000 * BytesSentAndAcked / elapsedMilliseconds) << "/sec\n";
                    std::cout << "---------------------------------------------------------------------------\n";
                    std::cout << "                        MilliSecondsPerAck: " << MillisecondsPerAck << "\n";
                    std::cout << "                     ThrottleAgressiveness: " << ThrottleAgressiveness << "\n";
                    std::cout << "  MinimumOfMillisecondsToUsePerSentPackage: " << MinimumOfMillisecondsToUsePerSentPackage << "\n";
                    std::cout << "---------------------------------------------------------------------------\n";
                }
            }

            EXPOSE_IN_DLL ~DummyClient(){

            };

    private:

        void SendPackages(){
            io_service = std::make_shared<boost::asio::io_service>();
            socket = std::make_shared<boost::asio::ip::udp::socket>(*io_service);
            udpEndPoint = std::make_shared<boost::asio::ip::udp::endpoint>(boost::asio::ip::address_v4::from_string("127.0.0.1"), 56000);
            socket->open(boost::asio::ip::udp::v4());

            //Start throtteling thread
            boost::thread throttleThread(&DummyClient::ThrottleThread, this);

            //Start Receiver thread - that listens for ACK packages from the Dummy server
            boost::thread receiverThread(&DummyClient::ReceiverThread, this);

            //Start sending packages - slowly 
            unsigned char dummyData[PacketSize];
            auto bufferToSend = boost::asio::buffer(dummyData, PacketSize);
            for (int i = 0; i < 100000; i++)
            {
                //Send
                socket->send_to(bufferToSend, *udpEndPoint, boost::asio::socket_base::message_do_not_route);

                PackagesSent++;

                //Check if we need to sleep a little (throtteling) before sending next package
                if (MinimumOfMillisecondsToUsePerSentPackage > 0)
                    boost::this_thread::sleep_for(boost::chrono::milliseconds(MinimumOfMillisecondsToUsePerSentPackage));
            }
        }

        //"Acks" are received here - we are just counting them to get the rate they're coming in at.
        void ReceiverThread(){

            //Need to wait until first package is sent - so that the local andpoint is bound
            while (PackagesSent == 0){
                boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
            }

            //Set up receiving buffer
            unsigned char receiveBuffer[100];
            auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 100);
            boost::asio::ip::udp::endpoint senderEndPoint;

            //Start Receiving!
            while (true){
                socket->receive_from(bufferToReceiveInto, senderEndPoint);
                AcksReceived++;
                BytesSentAndAcked += PacketSize;
            }
        }


        //Counts acks per interval - and adjusts outgoing speed accordingly.
        void ThrottleThread()
        {
            while (true)
            {
                boost::this_thread::sleep_for(boost::chrono::milliseconds(intervalMilliseconds));

                //Find out how many packages we got since last time - and send a little bit faster than this number.
                if (PackagesSent > 0)
                {
                    AcksSinceLastTime = AcksReceived - AcksLastTime;
                    AcksLastTime = AcksReceived;

                    if (AcksSinceLastTime == 0)
                    {
                        //No new packages got acked - slow the down!
                        MinimumOfMillisecondsToUsePerSentPackage = 200;
                        continue;
                    }

                    //Increase sending speed to a little bit faster than we're receiving packages
                    MillisecondsPerAck = 1.0 * intervalMilliseconds / AcksSinceLastTime;
                    MinimumOfMillisecondsToUsePerSentPackage = MillisecondsPerAck * ThrottleAgressiveness;
                }
            }
        }


        //Util method
        std::string BytesToSize(long long Bytes){
            float tb = 1099511627776;
            float gb = 1073741824;
            float mb = 1048576;
            float kb = 1024;

            std::string returnSize = "";

            if (Bytes >= tb){
                returnSize += boost::lexical_cast<std::string>((float)Bytes / tb);
                returnSize += " TB";
            }
            else if (Bytes >= gb && Bytes < tb)
            {
                returnSize += boost::lexical_cast<std::string>((float)Bytes / gb);
                returnSize += " GB";
            }
            else if (Bytes >= mb && Bytes < gb){
                returnSize += boost::lexical_cast<std::string>((float)Bytes / mb);
                returnSize += " MB";
            }
            else if (Bytes >= kb && Bytes < mb){
                returnSize += boost::lexical_cast<std::string>((float)Bytes / kb);
                returnSize += " KB";
            }
            else{
                returnSize += boost::lexical_cast<std::string>(Bytes);
                returnSize += " Bytes";
            }
            return returnSize;
        }
    };
}
 

服务器:

 #pragma once

#include <boost/asio.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/thread/v2/thread.hpp>
#include <boost/thread/detail/thread.hpp>

#define EXPOSE_IN_DLL __declspec(dllexport)

using namespace boost::asio;

namespace dummy{
class DummyServer
{
    public:

        std::shared_ptr<ip::udp::socket> listenSocket;
        std::shared_ptr<io_service> listenIoService;

        int PackagesReceived = 0;

        EXPOSE_IN_DLL DummyServer()
        {
            boost::thread receiverThread(&DummyServer::ReceivePackages, this);
            receiverThread.detach();

            //Print status
            while (true){
                boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
                system("cls");
                std::cout << "      ############  UDP  SERVER  ############   \n\n";
                std::cout << "       Packages received: " << PackagesReceived << "\n";
            }
        }

        EXPOSE_IN_DLL ~DummyServer(){}


        void ReceivePackages(){
            //Start Receiver thread
            listenIoService = std::make_shared<io_service>();
            listenSocket = std::make_shared<ip::udp::socket>(*listenIoService, ip::udp::endpoint(ip::udp::v4(), 56000));

            //Set up receiving buffer
            auto bytesReceived = 0;
            unsigned char receiveBuffer[1024 * 70];
            auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 1024 * 70);

            unsigned char returnBufferBuffer[10];
            auto bufferToSendBackToClient = boost::asio::buffer(returnBufferBuffer, 10);

            ip::udp::endpoint senderEndPoint;
            //Receive packages
            while (true){
                listenSocket->receive_from(bufferToReceiveInto, senderEndPoint);
                PackagesReceived++;

                //Send an "Ack" package back to client - letting him know that a package successfully made it.
                //Doesn't matter what the package contains - since client is just counting.
                listenSocket->send_to(bufferToSendBackToClient, senderEndPoint);
            }
    }
};
 

}

解决方案

终于找到了问题.是 sleep_for 导致了低速行驶.

此代码

boost::this_thread::sleep_for(boost::chrono::milliseconds(20));

通常需要30毫秒以上的时间-其他线程正在工作时.

我想我必须使用不同的节流策略(对飞行中的包裹计数等)-或使用高精度的多媒体计时器.

I am having some trouble with the performance of Boost ASIO UDP - or atleast - I think so.

I have put together a simple client/server application to illustrate:

  1. The Client starts to send large (65K) UDP packages to the server - slowly
  2. The server immediately sends back an ACK package when a package is received
  3. The Client keeps an eye on the rate these ACK packages are coming in - and sends outgoing packages a little faster - in order to have the speed pick up.

(Package loss/resending etc. is not a concern in the sample code.)

When originally writing this in C# the Client quickly reached the limit of the NIC - about 125 megabyte/sec. When rewriting the same code in C++ with Boost - speeds stop at under 2 megabyte/sec.

CPU for both client and server are 1-2%. No memory issues. Running both server and client on localhost. Running Windows 10, VS 2013, Boost 1.60.

I have tried using async send/receive methods of Boost but that did not seem to help. How to increase throughput of Boost ASIO, UDP client application

Is there a problem related to two threads sending/receiving from the same boost socket? C++ Socket Server - Unable to saturate CPU

PS - I started programming C++ 2 months ago - so there might be a really stupid mistake here somewhere.. but I am unable to see where exactly. Any help/thoughts would be greatly appriciated.

CLIENT:

#pragma once
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/thread/v2/thread.hpp>
#include <boost/thread/detail/thread.hpp>
#include <chrono>
#include <boost/lexical_cast.hpp>

#define EXPOSE_IN_DLL __declspec(dllexport)

namespace dummy{

    EXPOSE_IN_DLL class DummyClient
    {
        public:

            // How fast the client is receiving acks - e.g. every 200 ms.
            double MillisecondsPerAck = 200; 

            //How fast the client should be sending packages (meaning - how long it should sleep between packages)
            long long MinimumOfMillisecondsToUsePerSentPackage = 200; 

            //How often the code should throttle (calculate new value for 'MinimumOfMillisecondsToUsePerSentPackage')
            int intervalMilliseconds = 500;

            //How much faster we should send than receive (how fast we should increase the speed)
            const double ThrottleAgressiveness = 0.7;

            //Counters
            int AcksSinceLastTime = 0;
            int AcksLastTime = 0;
            int PackagesSent = 0;
            int AcksReceived = 0;
            long long BytesSentAndAcked = 0;

            //Size of UDP Package to send. IP Layer (NIC) will break larger packages into smaller ones (MTU).
            static const int PacketSize = 65000; 

            std::shared_ptr<boost::asio::io_service> io_service;
            std::shared_ptr<boost::asio::ip::udp::socket> socket;
            std::shared_ptr<boost::asio::ip::udp::endpoint> udpEndPoint;

            EXPOSE_IN_DLL DummyClient()
            {
                boost::thread receiverThread(&DummyClient::SendPackages, this);
                receiverThread.detach();

                using namespace std::chrono;
                auto started = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();

                while (true){

                    boost::this_thread::sleep_for(boost::chrono::milliseconds(100));

                    auto elapsedMilliseconds = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - started;

                    system("cls");
                    std::cout << "                  ############  UDP  CLIENT  ############   \n\n";
                    std::cout << "                             Packages sent: " << PackagesSent << "\n";
                    std::cout << "                             Acks received: " << AcksReceived << "\n";
                    std::cout << "                           Bytes delivered: " << BytesToSize(BytesSentAndAcked) << "\n";
                    std::cout << "---------------------------------------------------------------------------\n";

                    if (AcksReceived > 0 && PackagesSent > 0)
                        std::cout << "                          Acked percentage: " << 100.0 * AcksReceived / PackagesSent << "\n";

                    std::cout << "---------------------------------------------------------------------------\n";
                    std::cout << "                                 Bandwidth: " << BytesToSize(1000 * BytesSentAndAcked / elapsedMilliseconds) << "/sec\n";
                    std::cout << "---------------------------------------------------------------------------\n";
                    std::cout << "                        MilliSecondsPerAck: " << MillisecondsPerAck << "\n";
                    std::cout << "                     ThrottleAgressiveness: " << ThrottleAgressiveness << "\n";
                    std::cout << "  MinimumOfMillisecondsToUsePerSentPackage: " << MinimumOfMillisecondsToUsePerSentPackage << "\n";
                    std::cout << "---------------------------------------------------------------------------\n";
                }
            }

            EXPOSE_IN_DLL ~DummyClient(){

            };

    private:

        void SendPackages(){
            io_service = std::make_shared<boost::asio::io_service>();
            socket = std::make_shared<boost::asio::ip::udp::socket>(*io_service);
            udpEndPoint = std::make_shared<boost::asio::ip::udp::endpoint>(boost::asio::ip::address_v4::from_string("127.0.0.1"), 56000);
            socket->open(boost::asio::ip::udp::v4());

            //Start throtteling thread
            boost::thread throttleThread(&DummyClient::ThrottleThread, this);

            //Start Receiver thread - that listens for ACK packages from the Dummy server
            boost::thread receiverThread(&DummyClient::ReceiverThread, this);

            //Start sending packages - slowly 
            unsigned char dummyData[PacketSize];
            auto bufferToSend = boost::asio::buffer(dummyData, PacketSize);
            for (int i = 0; i < 100000; i++)
            {
                //Send
                socket->send_to(bufferToSend, *udpEndPoint, boost::asio::socket_base::message_do_not_route);

                PackagesSent++;

                //Check if we need to sleep a little (throtteling) before sending next package
                if (MinimumOfMillisecondsToUsePerSentPackage > 0)
                    boost::this_thread::sleep_for(boost::chrono::milliseconds(MinimumOfMillisecondsToUsePerSentPackage));
            }
        }

        //"Acks" are received here - we are just counting them to get the rate they're coming in at.
        void ReceiverThread(){

            //Need to wait until first package is sent - so that the local andpoint is bound
            while (PackagesSent == 0){
                boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
            }

            //Set up receiving buffer
            unsigned char receiveBuffer[100];
            auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 100);
            boost::asio::ip::udp::endpoint senderEndPoint;

            //Start Receiving!
            while (true){
                socket->receive_from(bufferToReceiveInto, senderEndPoint);
                AcksReceived++;
                BytesSentAndAcked += PacketSize;
            }
        }


        //Counts acks per interval - and adjusts outgoing speed accordingly.
        void ThrottleThread()
        {
            while (true)
            {
                boost::this_thread::sleep_for(boost::chrono::milliseconds(intervalMilliseconds));

                //Find out how many packages we got since last time - and send a little bit faster than this number.
                if (PackagesSent > 0)
                {
                    AcksSinceLastTime = AcksReceived - AcksLastTime;
                    AcksLastTime = AcksReceived;

                    if (AcksSinceLastTime == 0)
                    {
                        //No new packages got acked - slow the down!
                        MinimumOfMillisecondsToUsePerSentPackage = 200;
                        continue;
                    }

                    //Increase sending speed to a little bit faster than we're receiving packages
                    MillisecondsPerAck = 1.0 * intervalMilliseconds / AcksSinceLastTime;
                    MinimumOfMillisecondsToUsePerSentPackage = MillisecondsPerAck * ThrottleAgressiveness;
                }
            }
        }


        //Util method
        std::string BytesToSize(long long Bytes){
            float tb = 1099511627776;
            float gb = 1073741824;
            float mb = 1048576;
            float kb = 1024;

            std::string returnSize = "";

            if (Bytes >= tb){
                returnSize += boost::lexical_cast<std::string>((float)Bytes / tb);
                returnSize += " TB";
            }
            else if (Bytes >= gb && Bytes < tb)
            {
                returnSize += boost::lexical_cast<std::string>((float)Bytes / gb);
                returnSize += " GB";
            }
            else if (Bytes >= mb && Bytes < gb){
                returnSize += boost::lexical_cast<std::string>((float)Bytes / mb);
                returnSize += " MB";
            }
            else if (Bytes >= kb && Bytes < mb){
                returnSize += boost::lexical_cast<std::string>((float)Bytes / kb);
                returnSize += " KB";
            }
            else{
                returnSize += boost::lexical_cast<std::string>(Bytes);
                returnSize += " Bytes";
            }
            return returnSize;
        }
    };
}

SERVER:

#pragma once

#include <boost/asio.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/thread/v2/thread.hpp>
#include <boost/thread/detail/thread.hpp>

#define EXPOSE_IN_DLL __declspec(dllexport)

using namespace boost::asio;

namespace dummy{
class DummyServer
{
    public:

        std::shared_ptr<ip::udp::socket> listenSocket;
        std::shared_ptr<io_service> listenIoService;

        int PackagesReceived = 0;

        EXPOSE_IN_DLL DummyServer()
        {
            boost::thread receiverThread(&DummyServer::ReceivePackages, this);
            receiverThread.detach();

            //Print status
            while (true){
                boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
                system("cls");
                std::cout << "      ############  UDP  SERVER  ############   \n\n";
                std::cout << "       Packages received: " << PackagesReceived << "\n";
            }
        }

        EXPOSE_IN_DLL ~DummyServer(){}


        void ReceivePackages(){
            //Start Receiver thread
            listenIoService = std::make_shared<io_service>();
            listenSocket = std::make_shared<ip::udp::socket>(*listenIoService, ip::udp::endpoint(ip::udp::v4(), 56000));

            //Set up receiving buffer
            auto bytesReceived = 0;
            unsigned char receiveBuffer[1024 * 70];
            auto bufferToReceiveInto = boost::asio::buffer(receiveBuffer, 1024 * 70);

            unsigned char returnBufferBuffer[10];
            auto bufferToSendBackToClient = boost::asio::buffer(returnBufferBuffer, 10);

            ip::udp::endpoint senderEndPoint;
            //Receive packages
            while (true){
                listenSocket->receive_from(bufferToReceiveInto, senderEndPoint);
                PackagesReceived++;

                //Send an "Ack" package back to client - letting him know that a package successfully made it.
                //Doesn't matter what the package contains - since client is just counting.
                listenSocket->send_to(bufferToSendBackToClient, senderEndPoint);
            }
    }
};

}

解决方案

Found the problem - at last. It was sleep_for that caused the low speeds.

This code

boost::this_thread::sleep_for(boost::chrono::milliseconds(20));

often takes over 30ms - when other threads were doing work.

I guess I have to use a different throttling strategy (count packages in flight etc.) - or use a high precision multimedia timer.

这篇关于Boost UDP套接字的性能-低吞吐量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆