由于超时取消async_read [英] Cancel async_read due to timeout
问题描述
我试图在 async_read
周围编写一个包装器同步方法,以允许在套接字上进行非阻塞读取。下面几个关于互联网的例子我已经开发了一个似乎几乎正确但是不工作的解决方案。
类声明这些相关的属性和方法:
class communications_client
{
protected:
boost :: shared_ptr< boost :: asio :: io_service& _io_service;
boost :: shared_ptr& boost :: asio :: ip :: tcp :: socket> _插座;
boost :: array< boost :: uint8_t,128> _数据;
boost :: mutex _mutex;
bool _timeout_triggered;
bool _message_received;
boost :: system :: error_code _error;
size_t _bytes_transferred;
void handle_read(const boost :: system :: error_code&错误,size_t bytes_transferred);
void handle_timeout(const boost :: system :: error_code& error);
size_t async_read_helper(unsigned short bytes_to_transfer,const boost :: posix_time :: time_duration& timeout,boost :: system :: error_code& error);
...
}
方法 async_read_helper
是封装所有复杂性的函数,而另外两个 handle_read
和 handle_timeout
只是事件处理程序。这里是三个方法的实现:
void communications_client :: handle_timeout(const boost :: system :: error_code& error )
{
if(!error)
{
_mutex.lock();
_timeout_triggered = true;
_error.assign(boost :: system :: errc :: timed_out,boost :: system :: system_category());
_mutex.unlock();
}
}
void communications_client :: handle_read(const boost :: system :: error_code& error,size_t bytes_transferred)
{
_mutex。锁();
_message_received = true;
_error = error;
_bytes_transferred = bytes_transferred;
_mutex.unlock();
}
size_t communications_client :: async_read_helper(unsigned short bytes_to_transfer,const boost :: posix_time :: time_duration& timeout,boost :: system :: error_code& error)
{
_timeout_triggered = false;
_message_received = false;
boost :: asio :: deadline_timer timer(* _ io_service);
timer.expires_from_now(timeout);
timer.async_wait(
boost :: bind(
& communications_client :: handle_timeout,
this,
boost :: asio :: placeholders :: error)) ;
boost :: asio :: async_read(
* _socket,
boost :: asio :: buffer(_data,128),
boost :: asio :: transfer_exactly(bytes_to_transfer),
boost :: bind(
& communications_client :: handle_read,
this,
boost :: asio :: placeholders :: error,
boost :: asio :: placeholder :: bytes_transferred));
while(true)
{
_io_service-> poll_one();
if(_message_received)
{
timer.cancel();
break;
}
else if(_timeout_triggered)
{
_socket-> cancel();
break;
}
}
return _bytes_transferred;
}
我遇到的主要问题是:为什么这是一个循环< c $ c> _io_service-> poll_one()并且没有循环并调用 _io_service-> run_one()
?此外,我想知道,如果它看起来正确的任何人谁更习惯使用Boost和Asio。谢谢!
FIX PROPOSAL#1
根据 Jonathan Wakely 所做的评论,循环可以使用 _io_service-> run_one()
,呼叫 _io_service-> reset()
。它应该像:
_io_service-> run_one
if(_message_received)
{
timer.cancel();
}
else if(_timeout_triggered)
{
_socket-> cancel();
}
_io_service-> reset();
经过一些测试,我检查了这种解决方案是不是工作。持续调用 handle_timeout
方法,错误代码 operation_aborted
。如何停止这些通话?
FIX PROPOSAL#2
作者: twsansbury 是准确的,并且基于完整的文档基础。该实现导致 async_read_helper
中的以下代码:
while _io_service-> run_one())
{
if(_message_received)
{
timer.cancel();
}
else if(_timeout_triggered)
{
_socket-> cancel();
}
}
_io_service-> reset();
并对 handle_read
方法进行以下更改:
void communications_client :: handle_read(const boost :: system :: error_code& error,size_t bytes_transferred)
{
if(error!= boost :: asio :: error :: operation_aborted)
{
...
}
}
这个解决方案在测试过程中已证明是稳定和正确的。
解决方案
io_service :: run_one()
和io_service :: poll_one()
是run_one()
将阻塞,直到处理程序准备好运行,而poll_one()
将不会等待任何未处理的处理程序准备就绪。
假设
_io_service
上唯一未解决的处理程序是handle_timeout()
和handle_read()
,则run_one()
不需要循环, code> handle_timeout()或handle_read()
已运行。另一方面,poll_one()
需要一个循环,因为poll_one()
将立即返回,因为handle_timeout()
也不能handle_read()
准备运行,使函数最终返回。
原始代码以及修复建议#1的主要问题是,在
async_read_helper()
返回。在下一次调用async_read_helper()
时,要调用的下一个处理程序将是来自上一个调用的处理程序。io_service: :reset()
方法只允许io_service从停止状态恢复运行,它不会删除已经排队到io_service中的任何处理程序。要考虑这种行为,请尝试使用循环来消耗io_service中的所有处理程序。一旦所有的处理程序都消耗完了,退出循环并重置io_service:所有处理程序。
while(_io_service-> run_one())
{
if(_message_received)
{
//收到消息,因此取消计时器。这将强制完成
// handle_timer,并将boost :: asio :: error :: operation_aborted作为错误。
timer.cancel();
}
else if(_timeout_triggered)
{
//发生超时,因此取消套接字。这将强制完成
// handle_read,并将boost :: asio :: error :: operation_aborted作为错误。
_socket-> cancel();
}
}
//重置服务,保证后续运行处于良好状态。
_io_service-> reset();
从调用者的角度来看,这种形式的超时是同步的
run_one
块。但是,仍在I / O服务中进行工作。另一种方法是使用Boost.Asio的支持对于C ++未来等待未来并执行超时。这个代码可以更容易阅读,但它需要至少一个其他线程来处理I / O服务,因为等待超时的线程不再处理I / O服务://使用异步操作,以便在超时后可以取消。
std :: future< std :: size_t> read_result = boost :: asio :: async_read(
socket,buffer,boost :: asio :: use_future);
//如果发生超时,则取消操作。
if(read_result.wait_for(std :: chrono :: seconds(1))==
std :: future_status :: timeout)
{
socket.cancel
}
//否则,操作完成(成功或错误)。
else
{
//如果操作失败,则on_read.get()将抛出
// boost :: system :: system_error。
auto bytes_transferred = read_result.get();
//进程缓冲区
}
I'm trying to write a wrapper synchronous method around
async_read
to allow non blocking reads on a socket. Following several examples around internet I have developed a solution that seems to be almost right but which is not working.The class declares these relevant attributes and methods:
class communications_client { protected: boost::shared_ptr<boost::asio::io_service> _io_service; boost::shared_ptr<boost::asio::ip::tcp::socket> _socket; boost::array<boost::uint8_t, 128> _data; boost::mutex _mutex; bool _timeout_triggered; bool _message_received; boost::system::error_code _error; size_t _bytes_transferred; void handle_read(const boost::system::error_code & error, size_t bytes_transferred); void handle_timeout(const boost::system::error_code & error); size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error); ... }
The method
async_read_helper
is the one that encapsulates all the complexity, while the other twohandle_read
andhandle_timeout
are just the event handlers. Here is the implementation of the three methods:void communications_client::handle_timeout(const boost::system::error_code & error) { if (!error) { _mutex.lock(); _timeout_triggered = true; _error.assign(boost::system::errc::timed_out, boost::system::system_category()); _mutex.unlock(); } } void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred) { _mutex.lock(); _message_received = true; _error = error; _bytes_transferred = bytes_transferred; _mutex.unlock(); } size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error) { _timeout_triggered = false; _message_received = false; boost::asio::deadline_timer timer(*_io_service); timer.expires_from_now(timeout); timer.async_wait( boost::bind( &communications_client::handle_timeout, this, boost::asio::placeholders::error)); boost::asio::async_read( *_socket, boost::asio::buffer(_data, 128), boost::asio::transfer_exactly(bytes_to_transfer), boost::bind( &communications_client::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); while (true) { _io_service->poll_one(); if (_message_received) { timer.cancel(); break; } else if (_timeout_triggered) { _socket->cancel(); break; } } return _bytes_transferred; }
The main question I have is: why this works with a loop on
_io_service->poll_one()
and no without a loop and calling_io_service->run_one()
? Also, I would like to know if it looks correct to anyone who is more used to work with Boost and Asio. Thank you!
FIX PROPOSAL #1
According to the comments done by Jonathan Wakely the loop could be replaced using
_io_service->run_one()
with a call to_io_service->reset()
after the operations have finished. It should look like:_io_service->run_one(); if (_message_received) { timer.cancel(); } else if (_timeout_triggered) { _socket->cancel(); } _io_service->reset();
After some testing, I have checked that this kind of solution alone is not working. The
handle_timeout
method is being called continuously with the error codeoperation_aborted
. How can these calls be stopped?FIX PROPOSAL #2
The answer by twsansbury is accurate and based onto solid documentation basis. That implementation leads to the following code within the
async_read_helper
:while (_io_service->run_one()) { if (_message_received) { timer.cancel(); } else if (_timeout_triggered) { _socket->cancel(); } } _io_service->reset();
and the following change to the
handle_read
method:void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred) { if (error != boost::asio::error::operation_aborted) { ... } }
This solution has proved solid and correct during testing.
解决方案The main difference between
io_service::run_one()
andio_service::poll_one()
is thatrun_one()
will block until a handler is ready to run, whereaspoll_one()
will not wait for any outstanding handlers to become ready.Assuming the only outstanding handlers on
_io_service
arehandle_timeout()
andhandle_read()
, thenrun_one()
does not require a loop because it will only return once eitherhandle_timeout()
orhandle_read()
have ran. On the other hand,poll_one()
requires a loop becausepoll_one()
will return immediately, as neitherhandle_timeout()
norhandle_read()
are ready to run, causing the function to eventually return.The main issue with the original code, as well as the fix proposal #1, is that there are still outstanding handlers in the io_service when
async_read_helper()
returns. Upon the next call toasync_read_helper()
, the next handler to be invoked will be a handler from the previous call. Theio_service::reset()
method only allows the io_service to resume running from a stopped state, it does not remove any handlers already queued into the io_service. To account for this behavior, try using a loop to consume all of the handlers from the io_service. Once all handlers have been consumed, exit the loop and reset the io_service:// Consume all handlers. while (_io_service->run_one()) { if (_message_received) { // Message received, so cancel the timer. This will force the completion of // handle_timer, with boost::asio::error::operation_aborted as the error. timer.cancel(); } else if (_timeout_triggered) { // Timeout occured, so cancel the socket. This will force the completion of // handle_read, with boost::asio::error::operation_aborted as the error. _socket->cancel(); } } // Reset service, guaranteeing it is in a good state for subsequent runs. _io_service->reset();
From the caller's perspective, this form of timeout is synchronous as
run_one()
blocks. However, work is still being made within the I/O service. An alternative is to use Boost.Asio's support for C++ futures to wait on a future and perform a timeout. This code can be easier to read, but it requires at least one other thread to be processing the I/O service, as the thread waiting on the timeout is no longer processing the I/O service:// Use an asynchronous operation so that it can be cancelled on timeout. std::future<std::size_t> read_result = boost::asio::async_read( socket, buffer, boost::asio::use_future); // If timeout occurs, then cancel the operation. if (read_result.wait_for(std::chrono::seconds(1)) == std::future_status::timeout) { socket.cancel(); } // Otherwise, the operation completed (with success or error). else { // If the operation failed, then on_read.get() will throw a // boost::system::system_error. auto bytes_transferred = read_result.get(); // process buffer }
这篇关于由于超时取消async_read的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!