什么是发送的缓冲区与升压短耳::方法async_send_to的最佳方式 [英] What is the best way to send many buffers with Boost::Asio method async_send_to

查看:157
本文介绍了什么是发送的缓冲区与升压短耳::方法async_send_to的最佳方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

北京时间什么他送的缓冲区与升压短耳::方法 async_send_to 最好的方法?
而这整个的发送程序的可以在任何时间重复。进而我要确定每个的(正确的),经过时间的发送程序

What ist he best way to send many buffers with Boost::Asio method async_send_to? And this whole send procedure can be repeated at any time. And furthermore I want to determine the (correct) elapsed time of each send procedure.

我这样尝试:

//MainWindow.h

class MainWindow : public QMainWindow
{
    Q_OBJECT
public:
    explicit MainWindow(QWidget *parent = 0);
    ~MainWindow();
private slots:
    void on_connectPushButton_clicked();
    void on_asyncSendPushButton_clicked();
private:
    Ui::MainWindow *ui;
    QTime m_Timer;
    int m_BufferSize;
    int m_NumBuffersToSend;
    int m_TransferredBuffers;
    boost::asio::io_service m_IOService;
    std::unique_ptr<boost::asio::ip::udp::socket>     m_pSocket;
    boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
    void handle_send(const boost::system::error_code& error, std::size_t size);
    void stopTimerAndLog();
};

//MainWindow.cpp

#include "MainWindow.h"
#include "ui_MainWindow.h"

//Some Qt includes

#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>

using boost::asio::ip::udp;

MainWindow::MainWindow(QWidget *parent) :
    m_BufferSize(0),
    m_NumBuffersToSend(0),
    m_TransferredBuffers(0),
    QMainWindow(parent),
    ui(new Ui::MainWindow)
{
    ui->setupUi(this);
}

MainWindow::~MainWindow()
{
    delete ui;
}


void MainWindow::on_connectPushButton_clicked()
{
    try
    {
        udp::resolver resolver(m_IOService);
        udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
                ui->serverPortLineEdit->text().toStdString());
        m_ReceiverEndpoint = *resolver.resolve(query);
        m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
        m_pSocket->open(udp::v4());
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

void MainWindow::stopTimerAndLog()
{
    int tmm = m_Timer.elapsed();
    double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
            / ( 1024.0 * 1024.0 * tmm) * 8.0;
    LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
    LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
    ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}

void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
    m_TransferredBuffers++;

    if (error)
    {
        //missing error propagation to main thread
        LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
        LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
    }

    if ( m_TransferredBuffers >= m_NumBuffersToSend )
    {
        stopTimerAndLog();
        m_IOService.stop();
    }
}

void MainWindow::on_asyncSendPushButton_clicked()
{
    try
    {
        m_BufferSize = ui->sendBufferSpinBox->value();
        char* data = new char[m_BufferSize];
        memset(data, 0, m_BufferSize);
        m_NumBuffersToSend = ui->numBufferSpinBox->value();

        m_Timer.start();

        for (int i=0; i < m_NumBuffersToSend; i++)
        {
            memset(data, i, m_BufferSize);

            m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
                    m_ReceiverEndpoint,
                    boost::bind(&MainWindow::handle_send, this,
                        boost::asio::placeholders::error,
                        boost::asio::placeholders::bytes_transferred));
        }
        m_TransferredBuffers = 0;
        m_IOService.run();
        delete[] data;
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

正如你所看到的,用户的可点击的连接的按钮( on_connectPushButton_clicked )。然后的发送程序的通过点击的异步发送的按钮启动( on_asyncSendPushButton_clicked )。在这里,我启动定时器和呼叫 m_NumBuffersToSend 倍async_send_to方法。然后我运行的IOService 。对于每个 async_send_to 处理程序 handle_send 将被调用,在 m_TransferredBuffers 变量将直到它达到 m_NumBuffersToSend 递增。如果是这样的话,我停止计时器和的IOService

As you can see, the user can click on the connect button (on_connectPushButton_clicked). And then the send procedure starts by clicking on the async send button (on_asyncSendPushButton_clicked). And here I start the timer and call m_NumBuffersToSend times the async_send_to method. Then I run the IOService. For each async_send_to the handler handle_send will be called And the m_TransferredBuffers variable will be incremented until it reaches m_NumBuffersToSend. If this is the case, I stop the timer and the IOService.

但是,如果我比较这是我的程序计算与的真正的发送的UDP的使用Wireshark的时候总有一个很大的区别。怎样才可以有一个更准确的时间计算?

But if I compare the time which was calculated in my program with the real sent udp’s with Wireshark there is always a big difference. How can I have a more accurate time calculation?

是否有可能放置 m_IOService.run(); 之外 on_asyncSendPushButton_clicked

推荐答案

嘛。

我不知道你是什么样观察。这里的答案

I'm not sure what you are observing. Here's the answer to

问:是否有可能放置m_IOService.run();调用on_asyncSendPushButton_clicked外

Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked

是的,你应该使用 io_service对象::工作来保持IO服务运行。这里有一个演示程序:

Yes, you should use io_service::work to keep the IO service running. Here's a demo program:

<大骨节病> 住在Coliru


  • 我创建了一个单一的IO线程来服务异步操作/完成处理程序

  • 我已经剥离了Qt的依赖;演示运行 s的随机配置的:

struct Run {
    std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
    int remainingToSend      = rand()%10 + 1;
    int transferredBuffers   = 0;
    Clock::time_point start  = Clock::now();

    void stopTimerAndLog() const;
};


  • 作为奖励,我加的正确的统计使用的升压蓄电池

    相反在 stopTimerAndLog 操作(昂贵的)IO我们的样品添加到蓄电池的:

    Instead of doing (expensive) IO in stopTimerAndLog we add the samples to the accumulators:

    void stopTimerAndLog()
    {
        using namespace std::chrono;
    
        Clock::duration const elapsed = Clock::now() - start;
        int tmm = duration_cast<microseconds>(elapsed).count();
    
        double mBitPerSecond = tmm
            ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
            : std::numeric_limits<double>::infinity();
    
        std::lock_guard<std::mutex> lk(demo_results::mx);
    
        demo_results::bufsize(buffer.size());
        demo_results::micros(tmm);
        if (tmm)
            demo_results::mbps(mBitPerSecond);
    }
    


  • 您可以运行多个演示运行在重叠:

  • You can run multiple demo Runs in overlap:

    Demo demo;
    demo.on_connect(argv[1], argv[2]);
    
    for (int i = 0; i<100; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        demo.on_async_testrun();
    }
    // Demo destructor joins IO thread, making sure all stats are final
    


  • 互斥镇守的统计数据是多余的,但GoodPractive(TM),因为你可能要多的IO线程测试

  • The mutex guarding the statistics is redundant but GoodPractive(TM) since you may want to test with multiple IO threads

    输出:

    avg. Buffer size: 613.82, std.dev. 219.789
    avg. b/w:         160.61 mbps, std.dev. 81.061
    avg. time:        153.64 μs, std.dev. 39.0163
    

    完整列表

    #include <boost/asio.hpp>
    #include <boost/array.hpp>
    #include <boost/make_shared.hpp>
    #include <boost/bind.hpp>
    #include <thread>
    #include <mutex>
    #include <chrono>
    #include <memory>
    #include <iostream>
    #include <boost/accumulators/accumulators.hpp>
    #include <boost/accumulators/statistics.hpp>
    
    using boost::asio::ip::udp;
    typedef std::chrono::high_resolution_clock Clock;
    
    namespace demo_results {
        using namespace boost::accumulators;
    
        static std::mutex mx;
        accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
    }
    
    struct Run {
        std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
        int remainingToSend      = rand()%10 + 1;
        int transferredBuffers   = 0;
        Clock::time_point start  = Clock::now();
        Clock::duration elapsed;
    
        void stopTimerAndLog()
        {
            using namespace std::chrono;
    
            Clock::duration const elapsed = Clock::now() - start;
            int tmm = duration_cast<microseconds>(elapsed).count();
    
            double mBitPerSecond = tmm
                ? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
                : std::numeric_limits<double>::infinity();
    
            std::lock_guard<std::mutex> lk(demo_results::mx);
    
            demo_results::bufsize(buffer.size());
            demo_results::micros(tmm);
            if (tmm)
                demo_results::mbps(mBitPerSecond);
    
    #if 0
            std::cout << __FUNCTION__ << "  -----------------------------------------------\n";
            std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size()      << "\n";
            std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
            std::cout << __FUNCTION__ << ": " << "Time: "        << tmm                << " μs\n";
            std::cout << __FUNCTION__ << ": " << mBitPerSecond   << " MBit/s\n";
    #endif
        }
    
        typedef boost::shared_ptr<Run> Ptr;
    };
    
    struct Demo {
        boost::asio::io_service                        m_IOService;
        std::unique_ptr<boost::asio::io_service::work> m_work;
        std::unique_ptr<boost::asio::ip::udp::socket>  m_pSocket;
        boost::asio::ip::udp::endpoint                 m_ReceiverEndpoint;
        std::thread                                    m_io_thread;
    
        Demo() :
            m_IOService(),
            m_work(new boost::asio::io_service::work(m_IOService)),
            m_io_thread([this] { m_IOService.run(); })
        {
        }
    
        ~Demo() {
            m_work.reset();
            m_io_thread.join();
        }
    
        void on_connect(std::string const& host, std::string const& port)
        {
            try {
                udp::resolver resolver(m_IOService);
                m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
                m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
                m_pSocket->open(udp::v4());
            }
            catch (std::exception& e)
            {
                std::cerr << e.what() << std::endl;
            }
        }
    
        void perform_run(Run::Ptr state) {
            if (state->remainingToSend) {
                std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
    
                m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
                        m_ReceiverEndpoint,
                        boost::bind(&Demo::handle_sent, this,
                            boost::asio::placeholders::error,
                            boost::asio::placeholders::bytes_transferred,
                            state));
            } else {
                state->stopTimerAndLog();
            }
        }
    
        void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
        {
            assert(actually_transferred == state->buffer.size());
            state->transferredBuffers += 1;
            state->remainingToSend    -= 1;
    
            if (error) {
                // missing error propagation to main thread
                std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
                std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
            }
    
            perform_run(state); // remaining buffers for run
        }
    
        void on_async_testrun() {
            perform_run(boost::make_shared<Run>());
        }
    };
    
    int main(int argc, char const** argv)
    {
        assert(argc==3);
    
        {
            Demo demo;
            demo.on_connect(argv[1], argv[2]);
    
            for (int i = 0; i<100; ++i) {
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
                demo.on_async_testrun();
            }
        } // Demo destructor joins IO thread, making sure all stats are final
    
        using namespace boost::accumulators;
        std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. "      << sqrt(variance(demo_results::bufsize)) << "\n";
        std::cout << "avg. b/w:         " << mean(demo_results::mbps)    << " mbps, std.dev. " << sqrt(variance(demo_results::mbps))    << "\n";
        std::cout << "avg. time:        " << mean(demo_results::micros)  << " μs, std.dev. "   << sqrt(variance(demo_results::micros))  << "\n";
    }
    

    这篇关于什么是发送的缓冲区与升压短耳::方法async_send_to的最佳方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆