如何使用Boost协程等待变量的更改? [英] How to co_await for a change in a variable using boost coroutine ts?

查看:22
本文介绍了如何使用Boost协程等待变量的更改?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文
我使用Boost Coroutine、Boost ASIO和Boost Beast构建了一个Web服务器。 有一个用于阅读的协奏表和一个用于写作的协奏表。 有一个Message_to_Send队列,消息被推送到该队列中发送给用户。 编写协程检查Message_to_Send队列中是否有内容并将其发送。 发送写入后,协程挂起自身100毫秒,然后再次检查要写入的内容。

问题
写入协程每100毫秒轮询一次消息队列。我喜欢在某个计时器触发后找到不进行轮询的解决方案。

可行的解决方案
也许有一种解决方案,可以等待变量的变化。是否可能使用";Async_Initiate&Quot;创建Async_Wait_For_Callback?

代码示例
您可以克隆project。或使用此处发布的完整示例代码:

#include <algorithm>
#include <boost/asio.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/system_timer.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/bind/bind.hpp>
#include <boost/optional.hpp>
#include <chrono>
#include <cstddef>
#include <deque>
#include <exception>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <stdexcept>
#include <string>
// TODO use cmake to find out if the compiler is gcc or clang
#include <coroutine> // enable if build with gcc
// #include <experimental/coroutine> //enable if build with clang

using namespace boost::beast;
using namespace boost::asio;
typedef boost::asio::use_awaitable_t<>::as_default_on_t<boost::asio::basic_waitable_timer<boost::asio::chrono::system_clock>> CoroTimer;
typedef boost::beast::websocket::stream<boost::beast::tcp_stream> Websocket;
using namespace boost::beast;
using namespace boost::asio;
using boost::asio::ip::tcp;
using tcp_acceptor = use_awaitable_t<>::as_default_on_t<tcp::acceptor>;
struct User
{
  boost::asio::awaitable<void> writeToClient (std::weak_ptr<Websocket> &connection);
  std::deque<std::string> msgQueue{};
  std::shared_ptr<CoroTimer> timer{};
};

void
handleMessage (std::string const &msg, std::list<std::shared_ptr<User>> &users, std::shared_ptr<User> user)
{
  std::cout << "please implement handle message" << std::endl;
  user->msgQueue.push_back ("please implement handle message");
  user->timer->cancel ();
}

boost::asio::awaitable<void>
User::writeToClient (std::weak_ptr<Websocket> &connection)
{
  try
    {
      while (not connection.expired ())
        {
          timer = std::make_shared<CoroTimer> (CoroTimer{ co_await this_coro::executor });
          timer->expires_after (std::chrono::system_clock::time_point::max () - std::chrono::system_clock::now ());
          try
            {
              co_await timer->async_wait ();
            }
          catch (boost::system::system_error &e)
            {
              using namespace boost::system::errc;
              if (operation_canceled == e.code ())
                {
                  // swallow cancel
                }
              else
                {
                  std::cout << "error in timer boost::system::errc: " << e.code () << std::endl;
                  abort ();
                }
            }
          while (not msgQueue.empty () && not connection.expired ())
            {
              auto tmpMsg = std::move (msgQueue.front ());
              std::cout << " msg: " << tmpMsg << std::endl;
              msgQueue.pop_front ();
              co_await connection.lock ()->async_write (buffer (tmpMsg), use_awaitable);
            }
        }
    }
  catch (std::exception &e)
    {
      std::cout << "write Exception: " << e.what () << std::endl;
    }
}

class Server
{
public:
  Server (boost::asio::ip::tcp::endpoint const &endpoint);

  boost::asio::awaitable<void> listener ();

private:
  void removeUser (std::list<std::shared_ptr<User>>::iterator user);
  boost::asio::awaitable<std::string> my_read (Websocket &ws_);

  boost::asio::awaitable<void> readFromClient (std::list<std::shared_ptr<User>>::iterator user, Websocket &connection);

  boost::asio::ip::tcp::endpoint _endpoint{};
  std::list<std::shared_ptr<User>> users{};
};

namespace this_coro = boost::asio::this_coro;

Server::Server (boost::asio::ip::tcp::endpoint const &endpoint) : _endpoint{ endpoint } {}

awaitable<std::string>
Server::my_read (Websocket &ws_)
{
  std::cout << "read" << std::endl;
  flat_buffer buffer;
  co_await ws_.async_read (buffer, use_awaitable);
  auto msg = buffers_to_string (buffer.data ());
  std::cout << "number of letters '" << msg.size () << "' msg: '" << msg << "'" << std::endl;
  co_return msg;
}

awaitable<void>
Server::readFromClient (std::list<std::shared_ptr<User>>::iterator user, Websocket &connection)
{
  try
    {
      for (;;)
        {
          auto readResult = co_await my_read (connection);
          handleMessage (readResult, users, *user);
        }
    }
  catch (std::exception &e)
    {
      removeUser (user);
      std::cout << "read Exception: " << e.what () << std::endl;
    }
}

void
Server::removeUser (std::list<std::shared_ptr<User>>::iterator user)
{
  users.erase (user);
}

awaitable<void>
Server::listener ()
{
  auto executor = co_await this_coro::executor;
  tcp_acceptor acceptor (executor, _endpoint);
  for (;;)
    {
      try
        {
          auto socket = co_await acceptor.async_accept ();
          auto connection = std::make_shared<Websocket> (std::move (socket));
          users.emplace_back (std::make_shared<User> ());
          std::list<std::shared_ptr<User>>::iterator user = std::next (users.end (), -1);
          connection->set_option (websocket::stream_base::timeout::suggested (role_type::server));
          connection->set_option (websocket::stream_base::decorator ([] (websocket::response_type &res) { res.set (http::field::server, std::string (BOOST_BEAST_VERSION_STRING) + " websocket-server-async"); }));
          co_await connection->async_accept (use_awaitable);
          co_spawn (
              executor, [connection, this, &user] () mutable { return readFromClient (user, *connection); }, detached);
          co_spawn (
              executor, [connectionWeakPointer = std::weak_ptr<Websocket>{ connection }, &user] () mutable { return user->get ()->writeToClient (connectionWeakPointer); }, detached);
        }
      catch (std::exception &e)
        {
          std::cout << "Server::listener () connect  Exception : " << e.what () << std::endl;
        }
    }
}

auto const DEFAULT_PORT = u_int16_t{ 55555 };

int
main ()
{
  try
    {
      using namespace boost::asio;
      io_context io_context (1);
      signal_set signals (io_context, SIGINT, SIGTERM);
      signals.async_wait ([&] (auto, auto) { io_context.stop (); });
      auto server = Server{ { ip::tcp::v4 (), DEFAULT_PORT } };
      co_spawn (
          io_context, [&server] { return server.listener (); }, detached);
      io_context.run ();
    }
  catch (std::exception &e)
    {
      std::printf ("Exception: %s
", e.what ());
    }
  return 0;
}

编辑:根据sehe的想法更新代码,该想法标记为答案。

推荐答案

经典线程解决方案将是条件变量。当然,这不是您想要的--我看到您甚至显式禁用了ASIO线程。很好。

除了提供ASIO服务来实现此行为之外,还有一种方法是使用计时器来模拟条件变量。您可以使用从不过期的计时器(Deadline在TimePoint::Max()),并手动将其重置为timepoint::min()(取消任何ASSYNC_WAIT)或过去的任何时间来表示该条件。然后,您可以将Timer::async_waituse_awaitable一起使用,就像您已经知道如何使用一样。

请注意,您仍需要手动发出更改信号。这就是您想要的,因为其他任何事情都需要内核进程跟踪支持/硬件调试器工具,这需要巨大的特权,而且往往非常慢。

您可能想知道如何将use_awaitable关联为绑定到计时器的执行器的默认完成令牌。例如,参见示例:https://www.boost.org/doc/libs/1_78_0/doc/html/boost_asio/example/cpp17/coroutines_ts/echo_server_with_default.cpp(HTML文档不链接这些示例)

这篇关于如何使用Boost协程等待变量的更改?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆