有什么方法可以异步等待Boost Asio的未来吗? [英] Is there any way to asynchronously wait for a future in Boost Asio?

查看:60
本文介绍了有什么方法可以异步等待Boost Asio的未来吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题如下.我异步地开始了几个操作,我想继续直到所有这些操作完成为止.使用Boost Asio,最简单的方法如下.假设tasks是某种支持某些异步操作的对象的容器.

My problem is the following. I start several operations asynchronously, and I want to continue until all of them are finished. Using Boost Asio, the most straightforward way to do this is the following. Suppose tasks is some kind of container of objects that support some asynchronous operation.

tasksToGo = tasks.size();
for (auto task: tasks) {
    task.async_do_something([](const boost::system::error_code& ec)
    {
        if (ec) {
            // handle error
        } else {
            if (--taslsToGo == 0) {
                 tasksFinished();
            }
        }
    });
}

此解决方案的问题在于,这似乎是一种解决方法.在Boost 1.54中,我可以使用期货来做到这一点,但是我只能同步等待,这只能从与run()调用位置分开的线程中进行.

The problem with this solution is that it feels like a workaround. In Boost 1.54 I can do it with futures but I can only wait synchronously, which is only possible from a thread separate from where run() is called.

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

for (auto future: futures) {
    future.wait();
}

此代码比上一个代码清晰得多,但是我需要一个我不想要的单独线程.我想要可以这样使用的东西:

This code is much clearer than the previous one, but I need a separate thread which I don't want. I want something that can be used like this:

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

boost::asio::spawn(ioService, [](boost::asio::yield_context yield)
{
    for (auto future: futures) {
        future.async_wait(yield);
    }
    tasksFinished();

}

有什么可以类似使用的东西吗?

Is there anything that can be used similarly?

推荐答案

据我所知,目前尚无对此的一流支持.但是,考虑到库的方向,如果将来无法使用此功能,我会感到惊讶.

As far as I know, there is currently no first-class support for this. However, given the direction of the library, I would be surprised if this functionality was not available in the future.

已经提出了一些论文来增加对这种功能的支持:

A few papers have been proposed to add support for this type of functionality:

  • N3558 - A Standardized Representation of Asynchronous Operations is particularly interesting. It proposes when_all(futures) and future.next(). If it is implemented, then it would be possible to represent the asynchronous chain as:

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}
when_all(futures).then(&tasksFinished);

  • N3562-执行程序和调度程序介绍执行者.可以用来对async可以执行的上下文提供更好的控制.对于Boost.Asio,这可能需要提供某种类型的执行程序,以适应io_service.
  • N3562 - Executors and schedulers introduces executors. Which can be used to provided finer control as to the context in which an async can execute. For Boost.Asio, this would likely require providing some type of executor that defers to the io_service.
  • 尽管这些论文仍在进行中,但值得定期检查Boost.Thread的 github 以便这些建议的早期改编.

    While these papers are still ongoing, it may be worthwhile to periodically check Boost.Thread's Conformance and Extension page and Boost.Asio's github for early adaptations of these proposals.

    一年前,我需要使用此功能,而早期版本的Boost则需要我自己的解决方案.关于语义,仍然存在一些粗糙的领域,但是在采用某种官方方法之前,作为参考材料可能会有所帮助.在提供代码之前,这是一个基于您的问题的示例应用程序:

    I had the need for this functionality a year ago with a much earlier version of Boost so worked on my own solution. There are still some rough areas with regards to the semantics, but it may be helpful as a reference material until something official is adopted. Before I provide the code, here is an example application based on your question:

    #include <iostream>
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include "async_ops.hpp"
    
    void handle_timer(const boost::system::error_code& error, int x)
    {
      std::cout << "in handle timer: " << x << " : "
                << error.message() <<  std::endl;
    }
    
    void a() { std::cout << "a" << std::endl; }
    void b() { std::cout << "b" << std::endl; }
    void c() { std::cout << "c" << std::endl; }
    
    int main()
    {
      boost::asio::io_service io_service;
      boost::asio::deadline_timer timer1(io_service);
      boost::asio::deadline_timer timer2(io_service);
    
      // Create a chain that will continue once 2 handlers have been executed.
      chain all_expired = when_all(io_service, 2);
    
      all_expired.then(&a)  // Once 2 handlers finish, run a within io_service.
                 .then(&b)  // Once a has finished, run b within io_service.
                 .then(&c); // Once b has finished, run c within io_service.
    
      // Set expiration times for timers.
      timer1.expires_from_now(boost::posix_time::seconds(2));
      timer2.expires_from_now(boost::posix_time::seconds(5));
    
      // Asynchrnously wait for the timers, wrapping the handlers with the chain.
      timer1.async_wait(all_expired.wrap(
          boost::bind(&handle_timer, boost::asio::placeholders::error, 1)));
      timer2.async_wait(all_expired.wrap(
          boost::bind(&handle_timer, boost::asio::placeholders::error, 2)));
    
      // Run the io_service.
      io_service.run();
    }
    

    哪个会产生以下输出:

    in handle timer: 1 : Success
    in handle timer: 2 : Success
    a
    b
    c
    

    这是async_ops.hpp:

    #include <vector>
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/bind/protect.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/foreach.hpp>
    #include <boost/function.hpp>
    #include <boost/make_shared.hpp>
    #include <boost/range/iterator_range.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/thread/locks.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/type_traits/is_integral.hpp>
    #include <boost/type_traits/remove_reference.hpp>
    #include <boost/utility/enable_if.hpp>
    
    class chain;
    
    namespace detail {
    
    /// @brief Chained handler connects two handlers together that will
    ///        be called sequentially.
    ///
    /// @note Type erasure is not performed on Handler1 to allow resolving
    ///       to the correct asio_handler_invoke via ADL.
    template <typename Handler1> 
    class chained_handler
    {
    public:
      template <typename Handler2>
      chained_handler(Handler1 handler1, Handler2 handler2)
        : handler1_(handler1),
          handler2_(handler2)
      {}
    
      void operator()()
      {
        handler1_();
        handler2_();
      }
    
      template <typename Arg1>
      void operator()(const Arg1& a1)
      {
        handler1_(a1);
        handler2_();
      }
    
      template <typename Arg1, typename Arg2>
      void operator()(const Arg1& a1, const Arg2& a2)
      {
        handler1_(a1, a2);
        handler2_();
      }
    
    //private:
      Handler1 handler1_;
      boost::function<void()> handler2_;
    };
    
    /// @brief Hook that allows the sequential_handler to be invoked
    ///        within specific context based on the hander's type.
    template <typename Function,
              typename Handler>
    void asio_handler_invoke(
      Function function,
      chained_handler<Handler>* handler)
    {
      boost_asio_handler_invoke_helpers::invoke(
        function, handler->handler1_);
    }
    
    /// @brief No operation.
    void noop() {}
    
    /// @brief io_service_executor is used to wrap handlers, providing a
    ///        deferred posting to an io_service.  This allows for chains
    ///        to inherit io_services from other chains.
    class io_service_executor
      : public boost::enable_shared_from_this<io_service_executor>
    {
    public:
      /// @brief Constructor.
      explicit
      io_service_executor(boost::asio::io_service* io_service)
        : io_service_(io_service)
      {}
    
      /// @brief Wrap a handler, returning a functor that will post the
      ///        provided handler into the io_service.
      ///
      /// @param handler Handler to be wrapped for deferred posting.
      /// @return Functor that will post handler into io_service.
      template <typename Handler>
      boost::function<void()> wrap(Handler handler)
      {
        // By binding to the io_service_exectuer's post, the io_service
        // into which the handler can be posted can be specified at a later 
        // point in time.
        return boost::bind(&io_service_executor::post<Handler>,
                           shared_from_this(), handler);
      }
    
      /// @brief Set the io_service.
      void io_service(boost::asio::io_service* io_service)
      {
        io_service_ = io_service;
      }
    
      /// @brief Get the io_service.
      boost::asio::io_service* io_service()
      {
        return io_service_;
      }
    
    private:
    
      /// @brief Post handler into the io_service.
      ///
      /// @param handler The handler to post.
      template <typename Handler>
      void post(Handler handler)
      {
        io_service_->post(handler);
      }
    
    private:
      boost::asio::io_service* io_service_;
    };
    
    /// @brief chain_impl is an implementation for a chain.  It is responsible
    ///        for lifetime management, tracking posting and wrapped functions,
    ///        as well as determining when run criteria has been satisfied.
    class chain_impl
      : public boost::enable_shared_from_this<chain_impl>
    {
    public:
    
      /// @brief Constructor.
      chain_impl(boost::shared_ptr<io_service_executor> executor,
                 std::size_t required)
        : executor_(executor),
          required_(required)
      {}
    
      /// @brief Destructor will invoke all posted handlers.
      ~chain_impl()
      {
        run();
      }
    
      /// @brief Post a handler that will be posted into the executor 
      ///        after run criteria has been satisfied.
      template <typename Handler>
      void post(const Handler& handler)
      {
        deferred_handlers_.push_back(executor_->wrap(handler));
      }
    
      /// @brief Wrap a handler, returning a chained_handler.  The returned
      ///        handler will notify the impl when it has been invoked.
      template <typename Handler>
      chained_handler<Handler> wrap(const Handler& handler)
      {
        return chained_handler<Handler>(
          handler,                                                 // handler1
          boost::bind(&chain_impl::complete, shared_from_this())); // handler2
      }
    
      /// @brief Force run of posted handlers.
      void run()
      {
        boost::unique_lock<boost::mutex> guard(mutex_);
        run(guard);
      }
    
      /// @brief Get the executor.
      boost::shared_ptr<io_service_executor> executor() { return executor_; }
    
    private:
    
      /// @brief Completion handler invoked when a wrapped handler has been
      ///        invoked.
      void complete()
      {
        boost::unique_lock<boost::mutex> guard(mutex_);
    
        // Update tracking.
        if (required_)
          --required_;
    
        // If criteria has not been met, then return early.
        if (required_) return;
    
        // Otherwise, run the handlers.
        run(guard);    
      }
    
      /// @brief Run handlers.
      void run(boost::unique_lock<boost::mutex>& guard)
      {
        // While locked, swap handlers into a temporary.
        std::vector<boost::function<void()> > handlers;
        using std::swap;
        swap(handlers, deferred_handlers_);
    
        // Run handlers without mutex.
        guard.unlock();
        BOOST_FOREACH(boost::function<void()>& handler, handlers)
            handler();
        guard.lock();
      }
    
    private:
      boost::shared_ptr<io_service_executor> executor_;
      boost::mutex mutex_;
      std::size_t required_;
      std::vector<boost::function<void()> > deferred_handlers_;
    };
    
    /// @brief Functor used to wrap and post handlers or chains between two
    ///        implementations.
    struct wrap_and_post
    {
      wrap_and_post(
        boost::shared_ptr<detail::chain_impl> current,
        boost::shared_ptr<detail::chain_impl> next
      )
        : current_(current),
          next_(next)
      {}
    
      /// @brief Wrap a handler with next, then post into current.
      template <typename Handler>
      void operator()(Handler handler)
      {
        // Wrap the handler with the next implementation, then post into the
        // current.  The wrapped handler will keep next alive, and posting into
        // current will cause next::complete to be invoked when current is ran.
        current_->post(next_->wrap(handler));
      }
    
      /// @brief Wrap an entire chain, posting into the current.
      void operator()(chain chain);
    
    private:
      boost::shared_ptr<detail::chain_impl> current_;
      boost::shared_ptr<detail::chain_impl> next_;
    };
    
    } // namespace detail
    
    /// @brief Used to indicate that the a chain will inherit its service from an
    ///        outer chain.
    class inherit_service_type {};
    inherit_service_type inherit_service;
    
    /// @brief Chain represents an asynchronous call chain, allowing the overall
    ///        chain to be constructed in a verbose and explicit manner.
    class chain
    {
    public:
    
      /// @brief Constructor.
      ///
      /// @param io_service The io_service in which the chain will run.
      explicit
      chain(boost::asio::io_service& io_service)
        : impl_(boost::make_shared<detail::chain_impl>(
                  boost::make_shared<detail::io_service_executor>(&io_service),
                  0)),
          root_impl_(impl_)
      {}
    
      /// @brief Constructor.  The chain will inherit its io_service from an
      ///        outer chain.
      explicit
      chain(inherit_service_type)
        : impl_(boost::make_shared<detail::chain_impl>(
                  boost::make_shared<detail::io_service_executor>(
                    static_cast<boost::asio::io_service*>(NULL)),
                  0)),
          root_impl_(impl_)
      {}
    
      /// @brief Force run posted handlers.
      void run()
      {
        root_impl_->run();
      }
    
      /// @brief Chain link that will complete when the amount of wrapped
      ///        handlers is equal to required.
      ///
      /// @param required The amount of handlers required to be complete.
      template <typename T>
      typename boost::enable_if<boost::is_integral<
        typename boost::remove_reference<T>::type>, chain>::type
      any(std::size_t required = 1)
      {
        return chain(root_impl_, required);
      }
    
      /// @brief Chain link that wraps all handlers in container, and will
      ///        be complete when the amount of wrapped handlers is equal to
      ///        required.
      ///
      /// @param Container of handlers to wrap.
      /// @param required The amount of handlers required to be complete.
      template <typename Container>
      typename boost::disable_if<boost::is_integral<
        typename boost::remove_reference<Container>::type>, chain>::type
      any(const Container& container, 
          std::size_t required = 1)
      {
        return post(container, required);
      }
    
      /// @brief Chain link that wraps all handlers in iterator range, and will
      ///        be complete when the amount of wrapped handlers is equal to
      ///        required.
      ///
      /// @param Container of handlers to wrap.
      /// @param required The amount of handlers required to be complete.
      template <typename Iterator>
      chain any(Iterator begin, Iterator end,
                std::size_t required = 1)
      {
        return any(boost::make_iterator_range(begin, end), required);
      }
    
      /// @brief Chain link that will complete when the amount of wrapped
      ///        handlers is equal to required.
      ///
      /// @param required The amount of handlers required to be complete.
      template <typename T>
      typename boost::enable_if<boost::is_integral<
        typename boost::remove_reference<T>::type>, chain>::type
      all(T required)
      {
        return any<T>(required);
      }
    
      /// @brief Chain link that wraps all handlers in container, and will
      ///        be complete when all wrapped handlers from the container 
      ///        have been executed.
      ///
      /// @param Container of handlers to wrap.
      template <typename Container>
      typename boost::disable_if<boost::is_integral<
        typename boost::remove_reference<Container>::type>, chain>::type
      all(const Container& container)
      {
        return any(container, container.size());
      }
    
      /// @brief Chain link that wraps all handlers in iterator range, and will
      ///        be complete when all wrapped handlers from the iterator range
      ///        have been executed.
      ///
      /// @param Container of handlers to wrap.
      template <typename Iterator>
      chain all(Iterator begin, Iterator end)
      {
        return all(boost::make_iterator_range(begin, end));
      }
    
      /// @brief Chain link that represents a single sequential link.
      template <typename Handler>
      chain then(const Handler& handler)
      {
        boost::array<Handler, 1> handlers = {{handler}};
        return all(handlers);
      }
    
      /// @brief Wrap a handler, returning a chained_handler.
      template <typename Handler>
      detail::chained_handler<Handler> wrap(const Handler& handler)
      {
        return impl_->wrap(handler);
      }
    
      /// @brief Set the executor.
      void executor(boost::asio::io_service& io_service)
      {
        impl_->executor()->io_service(&io_service);
      }
    
      /// @brief Check if this chain should inherit its executor.
      bool inherits_executor()
      {
        return !impl_->executor()->io_service();
      }
    
    private:
    
      /// @brief Private constructor used to create links in the chain.
      ///
      /// @note All links maintain a handle to the root impl.  When constructing a
      ///       chain, this allows for links later in the chain to be stored as
      ///       non-temporaries.
      chain(boost::shared_ptr<detail::chain_impl> root_impl,
            std::size_t required)
        : impl_(boost::make_shared<detail::chain_impl>(
                  root_impl->executor(), required)),
          root_impl_(root_impl)
      {}
    
      /// @brief Create a new chain link, wrapping handlers and posting into
      ///        the current chain.
      template <typename Container>
      chain post(const Container& container,
                 std::size_t required)
      {
        // Create next chain.
        chain next(root_impl_, required);
    
        // Wrap handlers from the next chain, and post into the current chain.
        std::for_each(container.begin(), container.end(),
                      detail::wrap_and_post(impl_, next.impl_));
    
        return next;
      }
    
    private:
      boost::shared_ptr<detail::chain_impl> impl_;
      boost::shared_ptr<detail::chain_impl> root_impl_;
    };
    
    void detail::wrap_and_post::operator()(chain c)
    {
      // If next does not have an executor, then inherit from current.
      if (c.inherits_executor())
          c.executor(*current_->executor()->io_service());
    
      // When current completes, start the chain.
      current_->post(boost::protect(boost::bind(&chain::run, c)));
    
      // The next impl needs to be aware of when the chain stops, so
      // wrap a noop and append it to the end of the chain.
      c.then(next_->wrap(&detail::noop));  
    }
    
    // Convenience functions.
    template <typename T, typename Handler>
    chain async(T& t, const Handler& handler)
    {
      return chain(t).then(handler);
    }
    
    template <typename T,
              typename Container>
    chain when_all(T& t, const Container& container)
    {
      return chain(t).all(container);
    }
    
    template <typename T,
              typename Iterator>
    chain when_all(T& t, Iterator begin, Iterator end)
    {
      return chain(t).all(begin, end);
    }
    
    template <typename T,
              typename Container>
    chain when_any(T& t, const Container& container)
    {
      return chain(t).any(container);
    }
    
    template <typename T,
              typename Iterator>
    chain when_any(T& t, Iterator begin, Iterator end)
    {
      return chain(t).any(begin, end);
    }
    


    这里有一些基本的高级示例,它们使用上面的代码有两个线程.我的记法:


    Here are some basic to advance examples using the above code with two threads. My notation:

    • a -> b表示a,然后表示b
    • (a | b)表示ab.因此,(a | b) -> c表示ab完成时,然后运行c.
    • (a & b)表示ab.因此,(a & b) -> c表示当ab都完成时,然后运行c.
    • a -> b expresses a then b
    • (a | b) expresses a or b. Thus (a | b) -> c implies when either a or b finish, then run c.
    • (a & b) expresses a and b. Thus (a & b) -> c implies when both a and b finish, then run c.

    在每种情况之前,我都要打印链的符号.此外,每个函数在输入时都会打印一个大写字母,在退出时打印一个小写字母.

    Before each case, I print the chain's notation. Additionally, each function will print a capital letter when entering, and a lower letter when exiting.

    #include <iostream>
    #include <boost/asio.hpp>
    #include <boost/assign.hpp>
    #include <boost/thread.hpp>
    #include "async_ops.hpp"
    
    /// @brief Print identifiers when entering and exiting scope,
    ///        sleeping between.
    void print_and_sleep(char id, unsigned int sleep_time)
    {
      std::cout << char(toupper(id));
      boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));
      std::cout << char(tolower(id));
      std::cout.flush();
    }
    
    /// @brief Convenience function to create functors.
    boost::function<void()> make_fn(char id, unsigned int sleep_time)
    {
      return boost::bind(&print_and_sleep, id, sleep_time);  
    }
    
    /// @brief Run an io_service with multiple threads.
    void run_service(boost::asio::io_service& io_service)
    {
      boost::thread_group threads;
      threads.create_thread(boost::bind(
        &boost::asio::io_service::run, &io_service));
      io_service.run();
      threads.join_all();
    }
    
    int main()
    {
      boost::function<void()> a = make_fn('a', 500);
      boost::function<void()> b = make_fn('b', 1000);
      boost::function<void()> c = make_fn('c', 500);
      boost::function<void()> d = make_fn('d', 1000);
      boost::function<void()> e = make_fn('e', 500);
    
      {
        std::cout << "a -> b -> c\n"
                     "  ";
        boost::asio::io_service io_service;
        async(io_service, a)
          .then(b)
          .then(c);
        run_service(io_service);
        std::cout << std::endl;
      }
    
      {
        std::cout << "(a & b) -> c\n"
                     "  ";
        boost::asio::io_service io_service;
        when_all(io_service, boost::assign::list_of(a)(b))
          .then(c); 
        run_service(io_service);
        std::cout << std::endl;
      }
    
      {
        std::cout << "(a | b) -> c\n"
                     "  ";
        boost::asio::io_service io_service;
        when_any(io_service, boost::assign::list_of(a)(b))
          .then(c); 
        run_service(io_service);
        std::cout << std::endl;
      }
    
      {
        std::cout << "(a & b) -> (c & d)\n"
                     "  ";
        boost::asio::io_service io_service;
        when_all(io_service, boost::assign::list_of(a)(b))
          .all(boost::assign::list_of(c)(d));
        run_service(io_service);
        std::cout << std::endl;
      }
    
      {
        std::cout << "(a & b) -> c -> (d & e)\n"
                     "  ";
        boost::asio::io_service io_service;
        when_all(io_service, boost::assign::list_of(a)(b))
          .then(c)
          .all(boost::assign::list_of(d)(e));
        run_service(io_service);
        std::cout << std::endl;
      }
    
      std::cout << "(a & b) -> (c & d) -> e\n"
                   "  ";
      {
        boost::asio::io_service io_service;
        when_all(io_service, boost::assign::list_of(a)(b))
          .all(boost::assign::list_of(c)(d))
          .then(e);
        run_service(io_service);
        std::cout << std::endl;
      }
    
      std::cout << "(a | b) -> (c | d) -> e\n"
                   "  ";
      {
        boost::asio::io_service io_service;
        when_any(io_service, boost::assign::list_of(a)(b))
          .any(boost::assign::list_of(c)(d))
          .then(e);
        run_service(io_service);
        std::cout << std::endl;
      }
    
      std::cout << "(a | b) -> (c & d) -> e\n"
                   "  ";
      {
        boost::asio::io_service io_service;
        when_any(io_service, boost::assign::list_of(a)(b))
          .all(boost::assign::list_of(c)(d))
          .then(e);
        run_service(io_service);
        std::cout << std::endl;
      }
    
      {
        std::cout << "a -> ((b -> d) | c) -> e\n"
                     "  ";
        boost::asio::io_service io_service;
        async(io_service, a)
          .any(boost::assign::list_of
               (async(io_service, b).then(d))
               (async(inherit_service, c)))
          .then(e);
        run_service(io_service);
        std::cout << std::endl;
      }
    }
    

    产生以下输出:

    a -> b -> c
      AaBbCc
    (a & b) -> c
      ABabCc
    (a | b) -> c
      ABaCbc
    (a & b) -> (c & d)
      ABabCDcd
    (a & b) -> c -> (d & e)
      ABabCcDEed
    (a & b) -> (c & d) -> e
      ABabCDcdEe
    (a | b) -> (c | d) -> e
      ABaCbDcEed
    (a | b) -> (c & d) -> e
      ABaCbDcdEe
    a -> ((b -> d) | c) -> e
      AaBCcEbDed
    

    这篇关于有什么方法可以异步等待Boost Asio的未来吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆