线程池使用boost asio [英] Thread pool using boost asio
问题描述
我试图创建一个有限的线程池类使用boost :: asio。
I am trying to create a limited thread pool class using boost::asio. But I am stuck at one point can some one help me.
唯一的问题是我应该减少计数器的地方?
The only problem is the place where I should decrease counter?
代码不能正常工作。
问题是我不知道什么时候我的线程将完成执行,我将如何知道它已返回池
the problem is I don't know when my thread will finish execution and how I will come to know that it has return to pool
#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>
using namespace std;
using namespace boost;
class ThreadPool
{
static int count;
int NoOfThread;
thread_group grp;
mutex mutex_;
asio::io_service io_service;
int counter;
stack<thread*> thStk ;
public:
ThreadPool(int num)
{
NoOfThread = num;
counter = 0;
mutex::scoped_lock lock(mutex_);
if(count == 0)
count++;
else
return;
for(int i=0 ; i<num ; ++i)
{
thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
}
}
~ThreadPool()
{
io_service.stop();
grp.join_all();
}
thread* getThread()
{
if(counter > NoOfThread)
{
cout<<"run out of threads \n";
return NULL;
}
counter++;
thread* ptr = thStk.top();
thStk.pop();
return ptr;
}
};
int ThreadPool::count = 0;
struct callable
{
void operator()()
{
cout<<"some task for thread \n";
}
};
int main( int argc, char * argv[] )
{
callable x;
ThreadPool pool(10);
thread* p = pool.getThread();
cout<<p->get_id();
//how i can assign some function to thread pointer ?
//how i can return thread pointer after work done so i can add
//it back to stack?
return 0;
}
推荐答案
使用另一个函数包装用户提供的任务:
In short, you need to wrap the user's provided task with another function that will:
- 调用用户函数或可调用对象。
- 锁定互斥体并递减计数器。
这个线程池的要求。因此,为了清楚起见,这里是一个明确的列表,我相信是什么要求:
I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:
- 池管理线程的生命周期。
- 用户可以以非侵入方式将任务分配给池。
- 正在分配任务时,如果池中的所有主题都正在运行其他任务,则会舍弃该任务。
在我提供实现之前,有几个要点我想强调:
Before I provide an implementation, there are a few key points I would like to stress:
- 一旦线程启动,运行直到完成,取消或终止。线程正在执行的函数不能重新分配。为了允许单个线程在其生命周期中执行多个函数,线程将希望使用将从队列中读取的函数来启动,例如
io_service :: run()
,并且可调用类型被发布到事件队列中,例如从io_service :: post()
。 如果io_service $ c>中没有待处理的工作,
-
io_service :: run $ c>,
io_service
被停止,或者线程正在运行的处理程序抛出异常。为了防止io_serivce :: run()
从没有未完成的工作时返回,io_service :: work
- 定义任务的类型要求(即任务的类型必须可通过
object()
语法调用),而不是类型(即任务必须继承process
),为用户提供了更多的灵活性。它允许用户提供作为函数指针的任务或提供nullaryoperator()的类型。
- Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as
io_service::run()
, and callable types are posted into the event queue, such as fromio_service::post()
. io_service::run()
returns if there is no work pending in theio_service
, theio_service
is stopped, or an exception is thrown from a handler that the thread was running. To preventio_serivce::run()
from returning when there is no unfinished work, theio_service::work
class can be used.- Defining the task's type requirements (i.e. the task's type must be callable by
object()
syntax) instead of requiring a type (i.e. task must inherit fromprocess
), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullaryoperator()
.
使用 boost :: asio
实现:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
关于实施的几点意见:
- 异常处理需要发生在用户的任务周围。如果用户的函数或可调用对象抛出的异常不是
boost :: thread_interrupted
,那么std :: terminate()
被调用。这是Boost.Thread的线程函数中的异常行为。还值得阅读Boost.Asio的从处理程序抛出的异常效应。 - 如果用户通过<$ c提供
任务
$ c> boost :: bind ,则嵌套的boost :: bind
将无法编译。需要以下选项之一:
- 不支持
创建的
任务
如果的结果,则基于用户的类型来执行编译时分支的元编程:
boost :: bind ,以便可以使用
boost :: protect
,因为boost :: protect
只能在某些功能对象上正常运行。 - 不支持
- 使用另一种类型间接传递
任务
对象。我选择使用boost :: function
以获得可读性,代价是丢失确切的类型。boost :: tuple
,虽然稍微不太可读,但也可以用于保留确切的类型,如Boost.Asio的序列化示例。
- Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type
boost::thread_interrupted
, thenstd::terminate()
is called. This is the the result of Boost.Thread's exceptions in thread functions behavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers. - If the user provides the
task
viaboost::bind
, then the nestedboost::bind
will fail to compile. One of the following options is required:- Not support
task
created byboost::bind
. - Meta-programming to perform compile-time branching based on whether or not the user's type if the result of
boost::bind
so thatboost::protect
could be used, asboost::protect
only functions properly on certain function objects. - Use another type to pass the
task
object indirectly. I opted to useboost::function
for readability at the cost of losing the exact type.boost::tuple
, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serialization example.
应用程序代码现在可以非干扰地使用
thread_pool
p>
Application code can now use the
thread_pool
type non-intrusively:void work() {}; struct worker { void operator()() {}; }; void more_work( int ) {}; int main() { thread_pool pool( 2 ); pool.run_task( work ); // Function pointer. pool.run_task( worker() ); // Callable object. pool.run_task( boost::bind( more_work, 5 ) ); // Callable object. }
thread_pool
在没有Boost.Asio的情况下创建,并且对于维护者来说可能更容易一些,因为他们不再需要了解Boost.Asio
行为,例如io_service :: run()
return,什么是io_service :: work
对象:The
thread_pool
could be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know aboutBoost.Asio
behaviors, such as when doesio_service::run()
return, and what is theio_service::work
object:#include <queue> #include <boost/bind.hpp> #include <boost/thread.hpp> class thread_pool { private: std::queue< boost::function< void() > > tasks_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; boost::condition_variable condition_; bool running_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : available_( pool_size ), running_( true ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ; } } /// @brief Destructor. ~thread_pool() { // Set running flag to false then notify all threads. { boost::unique_lock< boost::mutex > lock( mutex_ ); running_ = false; condition_.notify_all(); } try { threads_.join_all(); } // Suppress all exceptions. catch ( const std::exception& ) {} } /// @brief Add task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Set task and signal condition variable so that a worker thread will // wake up andl use the task. tasks_.push( boost::function< void() >( task ) ); condition_.notify_one(); } private: /// @brief Entry point for pool threads. void pool_main() { while( running_ ) { // Wait on condition variable while the task is empty and the pool is // still running. boost::unique_lock< boost::mutex > lock( mutex_ ); while ( tasks_.empty() && running_ ) { condition_.wait( lock ); } // If pool is no longer running, break out. if ( !running_ ) break; // Copy task locally and remove from the queue. This is done within // its own scope so that the task object is destructed immediately // after running the task. This is useful in the event that the // function contains shared_ptr arguments bound via bind. { boost::function< void() > task = tasks_.front(); tasks_.pop(); lock.unlock(); // Run the task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} } // Task has finished, so increment count of available threads. lock.lock(); ++available_; } // while running_ } };
这篇关于线程池使用boost asio的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
- Not support