线程池使用boost asio [英] Thread pool using boost asio

查看:148
本文介绍了线程池使用boost asio的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图创建一个有限的线程池类使用boost :: asio。

I am trying to create a limited thread pool class using boost::asio. But I am stuck at one point can some one help me.

唯一的问题是我应该减少计数器的地方?

The only problem is the place where I should decrease counter?

代码不能正常工作。

问题是我不知道什么时候我的线程将完成执行,我将如何知道它已返回池

the problem is I don't know when my thread will finish execution and how I will come to know that it has return to pool

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads \n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread \n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}


推荐答案

使用另一个函数包装用户提供的任务:

In short, you need to wrap the user's provided task with another function that will:


  • 调用用户函数或可调用对象。

  • 锁定互斥体并递减计数器。

这个线程池的要求。因此,为了清楚起见,这里是一个明确的列表,我相信是什么要求:

I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:


  • 池管理线程的生命周期。

  • 用户可以以非侵入方式将任务分配给池。

  • 正在分配任务时,如果池中的所有主题都正在运行其他任务,则会舍弃该任务。

在我提供实现之前,有几个要点我想强调:

Before I provide an implementation, there are a few key points I would like to stress:


  • 一旦线程启动,运行直到完成,取消或终止。线程正在执行的函数不能重新分配。为了允许单个线程在其生命周期中执行多个函数,线程将希望使用将从队列中读取的函数来启动,例如 io_service :: run(),并且可调用类型被发布到事件队列中,例如从 io_service :: post()。 如果 io_service 中没有待处理的工作,

  • io_service :: run $ c>, io_service 被停止,或者线程正在运行的处理程序抛出异常。为了防止 io_serivce :: run()从没有未完成的工作时返回, io_service :: work

  • 定义任务的类型要求(即任务的类型必须可通过 object()语法调用),而不是类型(即任务必须继承 process ),为用户提供了更多的灵活性。它允许用户提供作为函数指针的任务或提供nullary operator()的类型。

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run() returns if there is no work pending in the io_service, the io_service is stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run() from returning when there is no unfinished work, the io_service::work class can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object() syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().

使用 boost :: asio 实现:

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

关于实施的几点意见:


  • 异常处理需要发生在用户的任务周围。如果用户的函数或可调用对象抛出的异常不是 boost :: thread_interrupted ,那么 std :: terminate()被调用。这是Boost.Thread的线程函数中的异常行为。还值得阅读Boost.Asio的从处理程序抛出的异常效应

  • 如果用户通过<$ c提供任务 $ c> boost :: bind ,则嵌套的 boost :: bind 将无法编译。需要以下选项之一:


    • 不支持创建的任务如果的结果,则基于用户的类型来执行编译时分支的元编程:
    • boost :: bind ,以便可以使用 boost :: protect ,因为 boost :: protect 只能在某些功能对象上正常运行。
    • 使用另一种类型间接传递任务对象。我选择使用 boost :: function 以获得可读性,代价是丢失确切的类型。 boost :: tuple ,虽然稍微不太可读,但也可以用于保留确切的类型,如Boost.Asio的序列化示例。

    • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate() is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
    • If the user provides the task via boost::bind, then the nested boost::bind will fail to compile. One of the following options is required:
      • Not support task created by boost::bind.
      • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bind so that boost::protect could be used, as boost::protect only functions properly on certain function objects.
      • Use another type to pass the task object indirectly. I opted to use boost::function for readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.

      应用程序代码现在可以非干扰地使用 thread_pool p>

      Application code can now use the thread_pool type non-intrusively:

      void work() {};
      
      struct worker
      {
        void operator()() {};
      };
      
      void more_work( int ) {};
      
      int main()
      { 
        thread_pool pool( 2 );
        pool.run_task( work );                        // Function pointer.
        pool.run_task( worker() );                    // Callable object.
        pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
      }
      

      thread_pool 在没有Boost.Asio的情况下创建,并且对于维护者来说可能更容易一些,因为他们不再需要了解 Boost.Asio 行为,例如 io_service :: run() return,什么是 io_service :: work 对象:

      The thread_pool could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asio behaviors, such as when does io_service::run() return, and what is the io_service::work object:

      #include <queue>
      #include <boost/bind.hpp>
      #include <boost/thread.hpp>
      
      class thread_pool
      {
      private:
        std::queue< boost::function< void() > > tasks_;
        boost::thread_group threads_;
        std::size_t available_;
        boost::mutex mutex_;
        boost::condition_variable condition_;
        bool running_;
      public:
      
        /// @brief Constructor.
        thread_pool( std::size_t pool_size )
          : available_( pool_size ),
            running_( true )
        {
          for ( std::size_t i = 0; i < pool_size; ++i )
          {
            threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
          }
        }
      
        /// @brief Destructor.
        ~thread_pool()
        {
          // Set running flag to false then notify all threads.
          {
            boost::unique_lock< boost::mutex > lock( mutex_ );
            running_ = false;
            condition_.notify_all();
          }
      
          try
          {
            threads_.join_all();
          }
          // Suppress all exceptions.
          catch ( const std::exception& ) {}
        }
      
        /// @brief Add task to the thread pool if a thread is currently available.
        template < typename Task >
        void run_task( Task task )
        {
          boost::unique_lock< boost::mutex > lock( mutex_ );
      
          // If no threads are available, then return.
          if ( 0 == available_ ) return;
      
          // Decrement count, indicating thread is no longer available.
          --available_;
      
          // Set task and signal condition variable so that a worker thread will
          // wake up andl use the task.
          tasks_.push( boost::function< void() >( task ) );
          condition_.notify_one();
        }
      
      private:
        /// @brief Entry point for pool threads.
        void pool_main()
        {
          while( running_ )
          {
            // Wait on condition variable while the task is empty and the pool is
            // still running.
            boost::unique_lock< boost::mutex > lock( mutex_ );
            while ( tasks_.empty() && running_ )
            {
              condition_.wait( lock );
            }
            // If pool is no longer running, break out.
            if ( !running_ ) break;
      
            // Copy task locally and remove from the queue.  This is done within
            // its own scope so that the task object is destructed immediately
            // after running the task.  This is useful in the event that the
            // function contains shared_ptr arguments bound via bind.
            {
              boost::function< void() > task = tasks_.front();
              tasks_.pop();
      
              lock.unlock();
      
              // Run the task.
              try
              {
                task();
              }
              // Suppress all exceptions.
              catch ( const std::exception& ) {}
            }
      
            // Task has finished, so increment count of available threads.
            lock.lock();
            ++available_;
          } // while running_
        }
      };
      

      这篇关于线程池使用boost asio的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆