由于超时取消async_read [英] Cancel async_read due to timeout

查看:381
本文介绍了由于超时取消async_read的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在 async_read 周围编写一个包装器同步方法,以允许在套接字上进行非阻塞读取。下面几个关于互联网的例子我已经开发了一个似乎几乎正确但是不工作的解决方案。



类声明这些相关的属性和方法:

  class communications_client 
{
protected:
boost :: shared_ptr< boost :: asio :: io_service& _io_service;
boost :: shared_ptr& boost :: asio :: ip :: tcp :: socket> _插座;
boost :: array< boost :: uint8_t,128> _数据;

boost :: mutex _mutex;
bool _timeout_triggered;
bool _message_received;
boost :: system :: error_code _error;
size_t _bytes_transferred;

void handle_read(const boost :: system :: error_code&错误,size_t bytes_transferred);
void handle_timeout(const boost :: system :: error_code& error);
size_t async_read_helper(unsigned short bytes_to_transfer,const boost :: posix_time :: time_duration& timeout,boost :: system :: error_code& error);

...
}

方法 async_read_helper 是封装所有复杂性的函数,而另外两个 handle_read handle_timeout 只是事件处理程序。这里是三个方法的实现:

  void communications_client :: handle_timeout(const boost :: system :: error_code& error )
{
if(!error)
{
_mutex.lock();
_timeout_triggered = true;
_error.assign(boost :: system :: errc :: timed_out,boost :: system :: system_category());
_mutex.unlock();
}
}

void communications_client :: handle_read(const boost :: system :: error_code& error,size_t bytes_transferred)
{
_mutex。锁();
_message_received = true;
_error = error;
_bytes_transferred = bytes_transferred;
_mutex.unlock();
}

size_t communications_client :: async_read_helper(unsigned short bytes_to_transfer,const boost :: posix_time :: time_duration& timeout,boost :: system :: error_code& error)
{
_timeout_triggered = false;
_message_received = false;

boost :: asio :: deadline_timer timer(* _ io_service);
timer.expires_from_now(timeout);
timer.async_wait(
boost :: bind(
& communications_client :: handle_timeout,
this,
boost :: asio :: placeholders :: error)) ;

boost :: asio :: async_read(
* _socket,
boost :: asio :: buffer(_data,128),
boost :: asio :: transfer_exactly(bytes_to_transfer),
boost :: bind(
& communications_client :: handle_read,
this,
boost :: asio :: placeholders :: error,
boost :: asio :: placeholder :: bytes_transferred));

while(true)
{
_io_service-> poll_one();
if(_message_received)
{
timer.cancel();
break;
}
else if(_timeout_triggered)
{
_socket-> cancel();
break;
}
}

return _bytes_transferred;
}

我遇到的主要问题是:为什么这是一个循环< c $ c> _io_service-> poll_one()并且没有循环并调用 _io_service-> run_one()?此外,我想知道,如果它看起来正确的任何人谁更习惯使用Boost和Asio。谢谢!






FIX PROPOSAL#1



根据 Jonathan Wakely 所做的评论,循环可以使用 _io_service-> run_one(),呼叫 _io_service-> reset() 。它应该像:

  _io_service-> run_one 
if(_message_received)
{
timer.cancel();
}
else if(_timeout_triggered)
{
_socket-> cancel();
}

_io_service-> reset();

经过一些测试,我检查了这种解决方案是不是工作。持续调用 handle_timeout 方法,错误代码 operation_aborted 。如何停止这些通话?



FIX PROPOSAL#2



作者: twsansbury 是准确的,并且基于完整的文档基础。该实现导致 async_read_helper 中的以下代码:

  while _io_service-> run_one())
{
if(_message_received)
{
timer.cancel();
}
else if(_timeout_triggered)
{
_socket-> cancel();
}
}
_io_service-> reset();

并对 handle_read 方法进行以下更改:

  void communications_client :: handle_read(const boost :: system :: error_code& error,size_t bytes_transferred)
{
if(error!= boost :: asio :: error :: operation_aborted)
{
...
}
}



这个解决方案在测试过程中已证明是稳定和正确的。

解决方案

io_service :: run_one() io_service :: poll_one() run_one()将阻塞,直到处理程序准备好运行,而 poll_one()将不会等待任何未处理的处理程序准备就绪。



假设 _io_service 上唯一未解决的处理程序是 handle_timeout() handle_read(),则 run_one()不需要循环, code> handle_timeout()或 handle_read()已运行。另一方面, poll_one()需要一个循环,因为 poll_one()将立即返回,因为 handle_timeout()也不能 handle_read()准备运行,使函数最终返回。



原始代码以及修复建议#1的主要问题是,在 async_read_helper()返回。在下一次调用 async_read_helper()时,要调用的下一个处理程序将是来自上一个调用的处理程序。 io_service: :reset() 方法只允许io_service从停止状态恢复运行,它不会删除已经排队到io_service中的任何处理程序。要考虑这种行为,请尝试使用循环来消耗io_service中的所有处理程序。一旦所有的处理程序都消耗完了,退出循环并重置io_service:

 所有处理程序。 
while(_io_service-> run_one())
{
if(_message_received)
{
//收到消息,因此取消计时器。这将强制完成
// handle_timer,并将boost :: asio :: error :: operation_aborted作为错误。
timer.cancel();
}
else if(_timeout_triggered)
{
//发生超时,因此取消套接字。这将强制完成
// handle_read,并将boost :: asio :: error :: operation_aborted作为错误。
_socket-> cancel();
}
}

//重置服务,保证后续运行处于良好状态。
_io_service-> reset();

从调用者的角度来看,这种形式的超时是同步的 run_one 块。但是,仍在I / O服务中进行工作。另一种方法是使用Boost.Asio的支持对于C ++未来等待未来并执行超时。这个代码可以更容易阅读,但它需要至少一个其他线程来处理I / O服务,因为等待超时的线程不再处理I / O服务:

  //使用异步操作,以便在超时后可以取消。 
std :: future< std :: size_t> read_result = boost :: asio :: async_read(
socket,buffer,boost :: asio :: use_future);

//如果发生超时,则取消操作。
if(read_result.wait_for(std :: chrono :: seconds(1))==
std :: future_status :: timeout)
{
socket.cancel
}
//否则,操作完成(成功或错误)。
else
{
//如果操作失败,则on_read.get()将抛出
// boost :: system :: system_error。
auto bytes_transferred = read_result.get();
//进程缓冲区
}


I'm trying to write a wrapper synchronous method around async_read to allow non blocking reads on a socket. Following several examples around internet I have developed a solution that seems to be almost right but which is not working.

The class declares these relevant attributes and methods:

class communications_client
{
    protected:
        boost::shared_ptr<boost::asio::io_service> _io_service;
        boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
        boost::array<boost::uint8_t, 128> _data;

        boost::mutex _mutex;
        bool _timeout_triggered;
        bool _message_received;
        boost::system::error_code _error;
        size_t _bytes_transferred;

        void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
        void handle_timeout(const boost::system::error_code & error);
        size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);

        ...
}

The method async_read_helper is the one that encapsulates all the complexity, while the other two handle_readand handle_timeout are just the event handlers. Here is the implementation of the three methods:

void communications_client::handle_timeout(const boost::system::error_code & error)
{
    if (!error)
    {
        _mutex.lock();
        _timeout_triggered = true;
        _error.assign(boost::system::errc::timed_out, boost::system::system_category());
        _mutex.unlock();
    }
}

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    _mutex.lock();
    _message_received = true;
    _error = error;
    _bytes_transferred = bytes_transferred;
    _mutex.unlock();
}

size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
    _timeout_triggered = false;
    _message_received = false;

    boost::asio::deadline_timer timer(*_io_service);
    timer.expires_from_now(timeout);
    timer.async_wait(
        boost::bind(
            &communications_client::handle_timeout,
            this,
            boost::asio::placeholders::error));

    boost::asio::async_read(
        *_socket,
        boost::asio::buffer(_data, 128),
        boost::asio::transfer_exactly(bytes_to_transfer),
        boost::bind(
            &communications_client::handle_read,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    while (true)
    {
        _io_service->poll_one();
        if (_message_received)
        {
            timer.cancel();
            break;
        }
        else if (_timeout_triggered)
        {
            _socket->cancel();
            break;
        }
    }

    return _bytes_transferred;
}

The main question I have is: why this works with a loop on _io_service->poll_one()and no without a loop and calling _io_service->run_one()? Also, I would like to know if it looks correct to anyone who is more used to work with Boost and Asio. Thank you!


FIX PROPOSAL #1

According to the comments done by Jonathan Wakely the loop could be replaced using _io_service->run_one() with a call to _io_service->reset() after the operations have finished. It should look like:

_io_service->run_one();
if (_message_received)
{
    timer.cancel();
}
else if (_timeout_triggered)
{
    _socket->cancel();
}

_io_service->reset();

After some testing, I have checked that this kind of solution alone is not working. The handle_timeoutmethod is being called continuously with the error code operation_aborted. How can these calls be stopped?

FIX PROPOSAL #2

The answer by twsansbury is accurate and based onto solid documentation basis. That implementation leads to the following code within the async_read_helper:

while (_io_service->run_one())
{
    if (_message_received)
    {
        timer.cancel();
    }
    else if (_timeout_triggered)
    {
        _socket->cancel();
    }
}
_io_service->reset();

and the following change to the handle_read method:

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    if (error != boost::asio::error::operation_aborted)
    {
        ...
    }
}

This solution has proved solid and correct during testing.

解决方案

The main difference between io_service::run_one() and io_service::poll_one() is that run_one() will block until a handler is ready to run, whereas poll_one() will not wait for any outstanding handlers to become ready.

Assuming the only outstanding handlers on _io_service are handle_timeout() and handle_read(), then run_one() does not require a loop because it will only return once either handle_timeout() or handle_read() have ran. On the other hand, poll_one() requires a loop because poll_one() will return immediately, as neither handle_timeout() nor handle_read() are ready to run, causing the function to eventually return.

The main issue with the original code, as well as the fix proposal #1, is that there are still outstanding handlers in the io_service when async_read_helper() returns. Upon the next call to async_read_helper(), the next handler to be invoked will be a handler from the previous call. The io_service::reset() method only allows the io_service to resume running from a stopped state, it does not remove any handlers already queued into the io_service. To account for this behavior, try using a loop to consume all of the handlers from the io_service. Once all handlers have been consumed, exit the loop and reset the io_service:

// Consume all handlers.
while (_io_service->run_one())
{
  if (_message_received)
  {
    // Message received, so cancel the timer.  This will force the completion of
    // handle_timer, with boost::asio::error::operation_aborted as the error.
    timer.cancel();
  }
  else if (_timeout_triggered)
  {
    // Timeout occured, so cancel the socket.  This will force the completion of
    // handle_read, with boost::asio::error::operation_aborted as the error.
    _socket->cancel();
  }
}

// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();

From the caller's perspective, this form of timeout is synchronous as run_one() blocks. However, work is still being made within the I/O service. An alternative is to use Boost.Asio's support for C++ futures to wait on a future and perform a timeout. This code can be easier to read, but it requires at least one other thread to be processing the I/O service, as the thread waiting on the timeout is no longer processing the I/O service:

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
    socket, buffer, boost::asio::use_future);

// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) == 
    std::future_status::timeout)
{
  socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
  // If the operation failed, then on_read.get() will throw a
  // boost::system::system_error.
  auto bytes_transferred = read_result.get();
  // process buffer
}

这篇关于由于超时取消async_read的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆