将std :: packaged_task添加到现有线程? [英] Add a std::packaged_task to an existing thread?

查看:90
本文介绍了将std :: packaged_task添加到现有线程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否存在将 std :: packaged_task 添加到现有线程的标准方法?在运行任务之前,必须要承担大量的开销,因此我想执行一次,然后保持线程运行并等待任务执行。我希望能够使用期货,以便可以选择获取任务的结果并捕获异常。

Is there an standard way to add a std::packaged_task to an existing thread? There's a nontrivial amount of overhead that must happen before the task is run, so I want to do that once, then keep the thread running and waiting for tasks to execute. I want to be able to use futures so I can optionally get the result of the task and catch exceptions.

我的C ++ 11以前的实现要求我的任务继承从具有 Run()方法的抽象基类(有点痛苦,不能使用lambda),并具有 std ::我在主线程中添加到队列中的deque 集合,在工作线程中从队列中退出队列。我必须保护该集合免于同时访问,并向工作线程提供信号,表明有事情要做,所以它不会旋转或休眠。入队返回带有同步对象以等待任务完成的结果对象以及结果值。一切正常,但是如果有更好的东西,是时候进行升级了。

My pre-C++11 implementation requires my tasks to inherit from an abstract base class with a Run() method (a bit of a pain, can't use lambdas), and having a std::deque collection of those that I add to in the main thread and dequeue from in the worker thread. I have to protect that collection from simultaneous access and provide a signal to the worker thread that there's something to do so it isn't spinning or sleeping. Enqueing something returns a "result" object with a synchronization object to wait for the task to complete, and a result value. It all works well but it's time for an upgrade if there's something better.

推荐答案

这里是一个玩具线程池:

Here is a toy thread pool:

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
struct thread_pool {
  thread_pool( std::size_t n = 1 ) { start_thread(n); }
  thread_pool( thread_pool&& ) = delete;
  thread_pool& operator=( thread_pool&& ) = delete;
  ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R()> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> run_task( F task ) {
    if (threads_active() >= total_threads()) {
      start_thread();
    }
    return queue_task( std::move(task) );
  }
  void terminate() {
    tasks.terminate();
  }
  std::size_t threads_active() const {
    return active;
  }
  std::size_t total_threads() const {
    return threads.size();
  }
  void clear_threads() {
    terminate();
    threads.clear();
  }
  void start_thread( std::size_t n = 1 ) {
    while(n-->0) {
      threads.push_back(
        std::async( std::launch::async,
          [this]{
            while(auto task = tasks.pop_front()) {
              ++active;
              try{
                (*task)();
              } catch(...) {
                --active;
                throw;
              }
              --active;
            }
          }
        )
      );
    }
  }
private:
  std::vector<std::future<void>> threads;
  threaded_queue<std::packaged_task<void()>> tasks;
  std::atomic<std::size_t> active;
};

我的另一个答案

带有一个线程的 thread_pool 与您的描述非常匹配。

A thread_pool with 1 thread matches your description pretty much.

上面只是一个玩具,一个真正的线程池,我将替换 std :: packaged_task< void()> move_only_function< void()> ,这就是我使用的全部功能。 (如果无效, packaged_task< void()> 可以有趣地容纳 packaged_task< R()>

The above is only a toy, a real thread pool I'd replace the std::packaged_task<void()> with a move_only_function<void()>, which is all I use it for. (A packaged_task<void()> can hold a packaged_task<R()> amusingly, if inefficiencly).

您将不得不考虑关机并制定计划。如果您尝试先关闭线程而不先清除线程,则上面的代码将锁定。

You will have to reason about shutdown and make a plan. The above code locks up if you try to shut it down without first clearing the threads.

这篇关于将std :: packaged_task添加到现有线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆