什么是发送的缓冲区与升压短耳::方法async_send_to的最佳方式 [英] What is the best way to send many buffers with Boost::Asio method async_send_to
问题描述
北京时间什么他送的缓冲区与升压短耳::方法 async_send_to
最好的方法?
而这整个的发送程序的可以在任何时间重复。进而我要确定每个的(正确的),经过时间的发送程序的
What ist he best way to send many buffers with Boost::Asio method async_send_to
?
And this whole send procedure can be repeated at any time. And furthermore I want to determine the (correct) elapsed time of each send procedure.
我这样尝试:
//MainWindow.h
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = 0);
~MainWindow();
private slots:
void on_connectPushButton_clicked();
void on_asyncSendPushButton_clicked();
private:
Ui::MainWindow *ui;
QTime m_Timer;
int m_BufferSize;
int m_NumBuffersToSend;
int m_TransferredBuffers;
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
void handle_send(const boost::system::error_code& error, std::size_t size);
void stopTimerAndLog();
};
//MainWindow.cpp
#include "MainWindow.h"
#include "ui_MainWindow.h"
//Some Qt includes
#include <boost/timer/timer.hpp>
#include <boost/array.hpp>
#include <boost/bind.hpp>
using boost::asio::ip::udp;
MainWindow::MainWindow(QWidget *parent) :
m_BufferSize(0),
m_NumBuffersToSend(0),
m_TransferredBuffers(0),
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
delete ui;
}
void MainWindow::on_connectPushButton_clicked()
{
try
{
udp::resolver resolver(m_IOService);
udp::resolver::query query(udp::v4(), ui->serverIpAddressLineEdit->text().toStdString(),
ui->serverPortLineEdit->text().toStdString());
m_ReceiverEndpoint = *resolver.resolve(query);
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void MainWindow::stopTimerAndLog()
{
int tmm = m_Timer.elapsed();
double mBitPerSecond = 1000.0 * static_cast<double>(m_BufferSize * m_NumBuffersToSend)
/ ( 1024.0 * 1024.0 * tmm) * 8.0;
LOG_INFO(__FUNCTION__ << ": " << QString("Buffer size: %1").arg(m_BufferSize).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Num Buffers: %1").arg(m_NumBuffersToSend).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("Time: %1 ms").arg(tmm).toStdString());
LOG_INFO(__FUNCTION__ << ": " << QString("%1 MBit/s").arg(mBitPerSecond).toStdString());
ui->mBitperSecondDoubleSpinBox->setValue(mBitPerSecond);
}
void MainWindow::handle_send(const boost::system::error_code &error, size_t size)
{
m_TransferredBuffers++;
if (error)
{
//missing error propagation to main thread
LOG_ERROR(__FUNCTION__ << ": ERROR: Client error while sending (error code = " << error << "): ");
LOG_ERROR(__FUNCTION__ << ": ERROR: Recovering...");
}
if ( m_TransferredBuffers >= m_NumBuffersToSend )
{
stopTimerAndLog();
m_IOService.stop();
}
}
void MainWindow::on_asyncSendPushButton_clicked()
{
try
{
m_BufferSize = ui->sendBufferSpinBox->value();
char* data = new char[m_BufferSize];
memset(data, 0, m_BufferSize);
m_NumBuffersToSend = ui->numBufferSpinBox->value();
m_Timer.start();
for (int i=0; i < m_NumBuffersToSend; i++)
{
memset(data, i, m_BufferSize);
m_pSocket->async_send_to(boost::asio::buffer(data, m_BufferSize),
m_ReceiverEndpoint,
boost::bind(&MainWindow::handle_send, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
m_TransferredBuffers = 0;
m_IOService.run();
delete[] data;
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
正如你所看到的,用户的可点击的连接的按钮( on_connectPushButton_clicked
)。然后的发送程序的通过点击的异步发送的按钮启动( on_asyncSendPushButton_clicked
)。在这里,我启动定时器和呼叫 m_NumBuffersToSend
倍async_send_to方法。然后我运行的IOService
。对于每个 async_send_to
处理程序 handle_send
将被调用,在 m_TransferredBuffers
变量将直到它达到 m_NumBuffersToSend
递增。如果是这样的话,我停止计时器和的IOService
。
As you can see, the user can click on the connect button (on_connectPushButton_clicked
). And then the send procedure starts by clicking on the async send button (on_asyncSendPushButton_clicked
). And here I start the timer and call m_NumBuffersToSend
times the async_send_to method. Then I run the IOService
. For each async_send_to
the handler handle_send
will be called And the m_TransferredBuffers
variable will be incremented until it reaches m_NumBuffersToSend
. If this is the case, I stop the timer and the IOService
.
但是,如果我比较这是我的程序计算与的真正的发送的UDP的使用Wireshark的时候总有一个很大的区别。怎样才可以有一个更准确的时间计算?
But if I compare the time which was calculated in my program with the real sent udp’s with Wireshark there is always a big difference. How can I have a more accurate time calculation?
是否有可能放置 m_IOService.run();
之外 on_asyncSendPushButton_clicked
推荐答案
嘛。
我不知道你是什么样观察。这里的答案
I'm not sure what you are observing. Here's the answer to
问:是否有可能放置m_IOService.run();调用on_asyncSendPushButton_clicked外
Q. Is it possible to place the m_IOService.run(); call outside on_asyncSendPushButton_clicked
是的,你应该使用 io_service对象::工作
来保持IO服务运行。这里有一个演示程序:
Yes, you should use io_service::work
to keep the IO service running. Here's a demo program:
<大骨节病> 住在Coliru 骨节病>
- 我创建了一个单一的IO线程来服务异步操作/完成处理程序
-
我已经剥离了Qt的依赖;演示
运行
s的随机配置的:
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
void stopTimerAndLog() const;
};
作为奖励,我加的正确的统计使用的升压蓄电池
相反在 stopTimerAndLog
操作(昂贵的)IO我们的样品添加到蓄电池的:
Instead of doing (expensive) IO in stopTimerAndLog
we add the samples to the accumulators:
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
}
您可以运行多个演示运行在重叠:
You can run multiple demo Runs in overlap:
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
// Demo destructor joins IO thread, making sure all stats are final
的互斥
镇守的统计数据是多余的,但GoodPractive(TM),因为你可能要多的IO线程测试
The mutex
guarding the statistics is redundant but GoodPractive(TM) since you may want to test with multiple IO threads
输出:
avg. Buffer size: 613.82, std.dev. 219.789
avg. b/w: 160.61 mbps, std.dev. 81.061
avg. time: 153.64 μs, std.dev. 39.0163
完整列表
#include <boost/asio.hpp>
#include <boost/array.hpp>
#include <boost/make_shared.hpp>
#include <boost/bind.hpp>
#include <thread>
#include <mutex>
#include <chrono>
#include <memory>
#include <iostream>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
using boost::asio::ip::udp;
typedef std::chrono::high_resolution_clock Clock;
namespace demo_results {
using namespace boost::accumulators;
static std::mutex mx;
accumulator_set<double, stats<tag::mean, tag::median, tag::variance> > bufsize, mbps, micros;
}
struct Run {
std::vector<char> buffer = std::vector<char>(rand()%800 + 200, '\0');
int remainingToSend = rand()%10 + 1;
int transferredBuffers = 0;
Clock::time_point start = Clock::now();
Clock::duration elapsed;
void stopTimerAndLog()
{
using namespace std::chrono;
Clock::duration const elapsed = Clock::now() - start;
int tmm = duration_cast<microseconds>(elapsed).count();
double mBitPerSecond = tmm
? buffer.size() * transferredBuffers * 8.0 / 1024 / 1024 / (tmm / 1000000.0)
: std::numeric_limits<double>::infinity();
std::lock_guard<std::mutex> lk(demo_results::mx);
demo_results::bufsize(buffer.size());
demo_results::micros(tmm);
if (tmm)
demo_results::mbps(mBitPerSecond);
#if 0
std::cout << __FUNCTION__ << " -----------------------------------------------\n";
std::cout << __FUNCTION__ << ": " << "Buffer size: " << buffer.size() << "\n";
std::cout << __FUNCTION__ << ": " << "Num Buffers: " << transferredBuffers << "\n";
std::cout << __FUNCTION__ << ": " << "Time: " << tmm << " μs\n";
std::cout << __FUNCTION__ << ": " << mBitPerSecond << " MBit/s\n";
#endif
}
typedef boost::shared_ptr<Run> Ptr;
};
struct Demo {
boost::asio::io_service m_IOService;
std::unique_ptr<boost::asio::io_service::work> m_work;
std::unique_ptr<boost::asio::ip::udp::socket> m_pSocket;
boost::asio::ip::udp::endpoint m_ReceiverEndpoint;
std::thread m_io_thread;
Demo() :
m_IOService(),
m_work(new boost::asio::io_service::work(m_IOService)),
m_io_thread([this] { m_IOService.run(); })
{
}
~Demo() {
m_work.reset();
m_io_thread.join();
}
void on_connect(std::string const& host, std::string const& port)
{
try {
udp::resolver resolver(m_IOService);
m_ReceiverEndpoint = *resolver.resolve(udp::resolver::query(udp::v4(), host, port));
m_pSocket = std::unique_ptr<boost::asio::ip::udp::socket>(new boost::asio::ip::udp::socket(m_IOService));
m_pSocket->open(udp::v4());
}
catch (std::exception& e)
{
std::cerr << e.what() << std::endl;
}
}
void perform_run(Run::Ptr state) {
if (state->remainingToSend) {
std::fill(state->buffer.begin(), state->buffer.end(), state->remainingToSend);
m_pSocket->async_send_to(boost::asio::buffer(state->buffer),
m_ReceiverEndpoint,
boost::bind(&Demo::handle_sent, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred,
state));
} else {
state->stopTimerAndLog();
}
}
void handle_sent(boost::system::error_code const&error, size_t actually_transferred, Run::Ptr state)
{
assert(actually_transferred == state->buffer.size());
state->transferredBuffers += 1;
state->remainingToSend -= 1;
if (error) {
// missing error propagation to main thread
std::cerr << __FUNCTION__ << ": ERROR: Client error while sending (error code = " << error.message() << "): ";
std::cerr << __FUNCTION__ << ": ERROR: Recovering...";
}
perform_run(state); // remaining buffers for run
}
void on_async_testrun() {
perform_run(boost::make_shared<Run>());
}
};
int main(int argc, char const** argv)
{
assert(argc==3);
{
Demo demo;
demo.on_connect(argv[1], argv[2]);
for (int i = 0; i<100; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
demo.on_async_testrun();
}
} // Demo destructor joins IO thread, making sure all stats are final
using namespace boost::accumulators;
std::cout << "avg. Buffer size: " << mean(demo_results::bufsize) << ", std.dev. " << sqrt(variance(demo_results::bufsize)) << "\n";
std::cout << "avg. b/w: " << mean(demo_results::mbps) << " mbps, std.dev. " << sqrt(variance(demo_results::mbps)) << "\n";
std::cout << "avg. time: " << mean(demo_results::micros) << " μs, std.dev. " << sqrt(variance(demo_results::micros)) << "\n";
}
这篇关于什么是发送的缓冲区与升压短耳::方法async_send_to的最佳方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!