我可以使用Boost.Asio在多线程I/O服务上超时地同步读取套接字吗? [英] Can I read from a socket synchronously using Boost.Asio with a timeout on a multithreaded I/O service?

查看:99
本文介绍了我可以使用Boost.Asio在多线程I/O服务上超时地同步读取套接字吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用Boost.Asio进行TCP和UDP套接字通信的应用程序.我了解"Asio"中的"A"代表 Asynchronous ,因此该库倾向于鼓励您在可能的情况下使用异步I/O.在某些情况下,最好使用同步套接字读取.但是,与此同时,我想对所述接收调用设置超时,因此不可能无限期地阻止读取.

I have an application that uses Boost.Asio for TCP and UDP socket communications. I understand that the 'A' in "Asio" stands for Asynchronous, so the library is bent toward encouraging you to use asynchronous I/O when possible. I have a few cases where synchronous socket reads are preferable. At the same time, however, I would like to set a timeout on said receive calls, so there's no possibility of the read blocking indefinitely.

在Boost.Asio用户中,这似乎是一个非常普遍的问题,以下是有关该主题的以下Stack Overflow问题:

This appears to be a pretty common problem among Boost.Asio users, with the following past Stack Overflow questions on the topic:

  • C++ Boost ASIO: how to read/write with a timeout?
  • asio::read with timeout
  • boost asio timeout
  • How to set a timeout on blocking sockets in boost asio?

甚至可能更多.甚至在文档,以了解如何实现带有超时的同步操作.他们归结为将同步操作转换为异步操作,然后与asio::deadline_timer并行启动.然后,计时器的到期处理程序可以在超时到期的情况下取消异步读取.看起来像这样(摘录自上面的链接示例):

There may even be more. There are even examples in the documentation for how to implement synchronous operations with timeouts. They boil down to converting the synchronous operation to an asynchronous one, then starting it in parallel with a asio::deadline_timer . The timer's expiration handler can then cancel the asynchronous read in the event that the timeout expires. This looks something like this (snippet taken from the above linked example):

std::size_t receive(const boost::asio::mutable_buffer& buffer,
      boost::posix_time::time_duration timeout, boost::system::error_code& ec)
  {
    // Set a deadline for the asynchronous operation.
    deadline_.expires_from_now(timeout);

    // Set up the variables that receive the result of the asynchronous
    // operation. The error code is set to would_block to signal that the
    // operation is incomplete. Asio guarantees that its asynchronous
    // operations will never fail with would_block, so any other value in
    // ec indicates completion.
    ec = boost::asio::error::would_block;
    std::size_t length = 0;

    // Start the asynchronous operation itself. The handle_receive function
    // used as a callback will update the ec and length variables.
    socket_.async_receive(boost::asio::buffer(buffer),
        boost::bind(&client::handle_receive, _1, _2, &ec, &length));

    // Block until the asynchronous operation has completed.
    do io_service_.run_one(); while (ec == boost::asio::error::would_block);

    return length;
  }

这实际上是一个相对干净的解决方案:启动异步操作,然后手动轮询asio::io_service以一次执行一个异步处理程序,直到async_receive()完成或计时器到期.

This actually is a relatively clean solution: start the asynchronous operations, then manually poll the asio::io_service to execute asynchronous handlers one at a time until either the async_receive() completes or the timer expires.

但是,如果套接字的基础I/O服务已经在一个或多个后台线程中运行,那该怎么办?在这种情况下,不能保证异步操作的处理程序会由上面的代码片段中的前台线程运行,因此run_one()不会返回,直到稍后执行某个可能不相关的处理程序.这会使套接字读取反应迟钝.

However, what about the case where the socket's underlying I/O service is already being run in one or more background threads? In that case, there's no guarantee that the handlers for the asynchronous operations would be run by the foreground thread in the above snippet, so run_one() wouldn't return until some later, possibly unrelated, handler executes. This would make the socket reads rather unresponsive.

asio::io_service具有一个poll_one()函数,该函数将在不阻塞的情况下检查服务的队列,但是我看不到在处理程序执行之前阻塞前台线程(模拟同步调用行为)的好方法,除了如果没有后台线程已经在执行asio::io_service::run()的情况.

asio::io_service has a poll_one() function that will check the service's queue without blocking, but I don't see a good way to block the foreground thread (emulating the synchronous call behavior) until the handler executes, except for the case where there are no background threads that are executing asio::io_service::run() already.

我看到两个潜在的解决方案,我都不喜欢:

I see two potential solutions, neither of which I like:

  1. 在启动异步操作之后,请使用条件变量或类似的构造来制作前台线程块.在async_receive()调用的处理程序中,向条件变量发出信号以解除对线程的阻塞.这会导致每次读取都有些锁定,我想避免这种锁定,因为我想在UDP套接字读取上实现最大可能的吞吐量.否则,它是可行的,除非有更好的方法,否则我可能会这样做.

  1. Use a condition variable or similar construct to make the foreground thread block after starting the asynchronous operations. In the handler for the async_receive() call, signal the condition variable to unblock the thread. This induces some locking for each read, which I would like to avoid, as I'd like to achieve the maximum possible throughput on the UDP socket reads. Otherwise, it is viable, and is probably what I would do unless a superior method presents itself.

确保套接字具有其自己的asio::io_service,该asio::io_service不会由任何后台线程运行.在需要的情况下,这使得使用套接字的异步I/O变得更加困难.

Ensure that the socket has its own asio::io_service that is not being run by any background threads. This makes it harder to use asynchronous I/O with the socket in the cases where that is desired.

是否有其他以安全方式实现此目标的想法?

Any ideas for other ways to accomplish this in a safe way?

此外:对于以前的SO问题,有一些答案提倡使用SO_RCVTIMEO socket选项实现套接字读取超时.从理论上讲,这听起来不错,但至少在我的平台上似乎不起作用(Ubuntu 12.04,Boost v1.55).我可以设置套接字超时,但是使用Asio不会达到预期的效果.相关代码在/boost/asio/detail/impl/socket_ops.ipp中:

Aside: There are some answers to previous SO questions that advocate using the SO_RCVTIMEO socket option to implement the socket read timeout. This sounds great in theory, but it doesn't seem to work on my platform at least (Ubuntu 12.04, Boost v1.55). I can set the socket timeout, but it won't give the desired effect with Asio. The relevant code is in /boost/asio/detail/impl/socket_ops.ipp:

size_t sync_recvfrom(socket_type s, state_type state, buf* bufs,
    size_t count, int flags, socket_addr_type* addr,
    std::size_t* addrlen, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return 0;
  }

  // Read some data.
  for (;;)
  {
    // Try to complete the operation without blocking.
    signed_size_type bytes = socket_ops::recvfrom(
        s, bufs, count, flags, addr, addrlen, ec);

    // Check if operation succeeded.
    if (bytes >= 0)
      return bytes;

    // Operation failed.
    if ((state & user_set_non_blocking)
        || (ec != boost::asio::error::would_block
          && ec != boost::asio::error::try_again))
      return 0;

    // Wait for socket to become ready.
    if (socket_ops::poll_read(s, 0, ec) < 0)
      return 0;
  }
}

如果套接字读取超时,则对以上recvfrom()的调用将返回EAGAINEWOULDBLOCK,它们将转换为boost::asio::error::try_againboost::asio::error::would_block.在这种情况下,上面的代码将调用poll_read()函数,对于我的平台,该函数如下所示:

If a socket read times out, the call to recvfrom() above would return EAGAIN or EWOULDBLOCK, which get translated to boost::asio::error::try_again or boost::asio::error::would_block. In this case, the above code will call the poll_read() function, which for my platform looks like:

int poll_read(socket_type s, state_type state, boost::system::error_code& ec)
{
  if (s == invalid_socket)
  {
    ec = boost::asio::error::bad_descriptor;
    return socket_error_retval;
  }

  pollfd fds;
  fds.fd = s;
  fds.events = POLLIN;
  fds.revents = 0;
  int timeout = (state & user_set_non_blocking) ? 0 : -1;
  clear_last_error();
  int result = error_wrapper(::poll(&fds, 1, timeout), ec);
  if (result == 0)
    ec = (state & user_set_non_blocking)
      ? boost::asio::error::would_block : boost::system::error_code();
  else if (result > 0)
    ec = boost::system::error_code();
  return result;
}

我摘录了有条件地为其他平台编译的代码.如您所见,如果套接字不是非阻塞套接字,它将最终以无限超时调用poll(),因此将阻塞直到套接字具有要读取的数据为止(并阻止超时尝试).因此,SO_RCVTIMEO选项无效.

I snipped out the code conditionally compiled for other platforms. As you can see, if the socket is not a non-blocking socket, it ends up calling poll() with an infinite timeout, therefore blocking until the socket has data to be read (and foiling the attempt at a timeout). Thus, the SO_RCVTIMEO option is not effective.

推荐答案

Boost.Asio对 boost::asio::use_future 值作为其完成处理程序,初始化函数将返回一个std::future对象,该对象将接收操作的结果.另外,如果操作以失败完成,则error_code会转换为system_error并通过future传递给调用方.

Boost.Asio's support for futures may provide an elegant solution. When an asynchronous operation is provided the boost::asio::use_future value as its completion handler, the initiating function will return a std::future object that will receive the result of the operation. Additionally, if the operation completes with failure, the error_code is converted into a system_error and passed to the caller via the future.

在Boost.Asio C ++ 11期货中工作时间客户端示例,专用线程运行io_service,主线程启动异步操作,然后同步等待操作完成,例如:

In the Boost.Asio C++11 Futures datytime client example, a dedicated thread runs the io_service, and the main thread initiates asynchronous operations then synchronously waits on the completion of the operations, such as below:

std::array<char, 128> recv_buf;
udp::endpoint sender_endpoint;
std::future<std::size_t> recv_length =
  socket.async_receive_from(
      boost::asio::buffer(recv_buf),
      sender_endpoint,
      boost::asio::use_future);

// Do other things here while the receive completes.

std::cout.write(
    recv_buf.data(),
    recv_length.get()); // Blocks until receive is complete.

使用future时,实现带有超时的同步读取的总体方法与以前相同.与其使用同步读取,不如使用异步读取并异步等待计时器.唯一的次要更改是,不是调用future::get()而不是阻塞io_service或定期检查谓词,而是调用future::get()进行阻塞,直到操作成功或失败(例如超时)为止.

When using futures, the overall approach to implementing a synchronous read with a timeout is the same as before. Instead of using a synchronous read, one would use an asynchronous read and asynchronously wait on a timer. The only minor change is that instead of instead of blocking on the io_service or periodically checking for the predicate, one would invoke future::get() to block until the operation has completed with success or failure (such as a timeout).

如果C ++ 11不可用,则可以为Boost.Thread的future自定义异步操作的返回类型,如

If C++11 is not available, then the return type for asynchronous operations can be customized for Boost.Thread's future, as demonstrated in this answer.

这篇关于我可以使用Boost.Asio在多线程I/O服务上超时地同步读取套接字吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆