提高:对于偶尔同步任务ASIO线程池实现 [英] boost:asio thread pool implementation for occasionally synchronized tasks
问题描述
我有一个主功能,每个时间步骤执行许多小的,独立的任务各一次。然而,每次步骤后,我必须等待所有的任务挺身而出之前完成。
我想使程序的多线程。我曾尝试与升压分支线程池的实现,我已经使用(共享指针)线程矢量尝试过了,我已经试过了ASIO线程池的想法(使用io_service对象,建立了一些工作,然后分发运行到线程和张贴处理程序io_service对象)。
所有这些似乎有大量的开销创建和销毁线程我很多小的任务,我想pferably使用ASIO工具,从某种意义上说,$ P $来实现一个记录io_service对象,一是thread_group,张贴处理程序到io_service对象,并等待一个时间步的工作,发布更多的任务前完成。有没有做到这一点的好办法?这里的(精简)code为我现在的工作:
的boost ::支持ASIO :: io_service对象io_service对象;
对于(INT theTime = 0; theTime = TOTALTIME;!++ theTime)
{
io_service.reset();
提高:: thread_group线程;
//工作完成中分配后,作用域销毁工作对象
{
提高:: ASIO :: io_service对象::工作工作(io_service对象);
的for(int i = 0; I< maxNumThreads ++ I)
{
threads.create_thread(提高::绑定(安培;提高:: ASIO :: io_service对象::运行,
&放大器; io_service对象));
} 的for(int i = 0; I< numSmallTasks ++ I)
{
io_service.post(升压::绑定(安培; process_data,我,theTime));
}
}
threads.join_all();
}
这就是我宁愿有(但不知道如何实现):
的boost ::支持ASIO :: io_service对象io_service对象;
提高:: thread_group线程;
提高:: ASIO :: io_service对象::工作工作(io_service对象);
的for(int i = 0; I< maxNumThreads ++ I)
{
threads.create_thread(提高::绑定(安培;提高:: ASIO :: io_service对象::运行,
&放大器; io_service对象));
}对于(INT theTime = 0; theTime = TOTALTIME;!++ theTime)
{
的for(int i = 0; I< numSmallTasks ++ I)
{
io_service.post(升压::绑定(安培; process_data,我,theTime));
}
//在这里等着,直到所有的这些任务循环之前完成
// ****我该怎么做呢? *****
}
//后来销毁工作,后来加入的所有主题...
您可以使用<一个href=\"http://www.boost.org/doc/libs/1_51_0/doc/html/thread/synchronization.html#thread.synchronization.futures\"相对=nofollow>期货中进行数据处理,并使用与他们同步<一个href=\"http://www.boost.org/doc/libs/1_51_0/doc/html/thread/synchronization.html#thread.synchronization.futures.reference.wait_for_all\"相对=nofollow> 的boost :: wait_for_all()
。这将允许你在做的部分工作,而不是线程方面工作。
INT process_data(){...}//待期货
的std ::矢量&lt;提高:: unique_future&LT; INT&GT;&GT; pending_data;的for(int i = 0; I&LT; numSmallTasks ++ I)
{
//创建任务和相应的未来
//使用共享PTR和绑定运营商()技巧,因为
// packaged_task是不可复制的,但ASIO :: io_service对象::岗位要求参数可拷贝 //提升1.51语法
//升压1.53+或C ++ 11的std :: packaged_task应提振:: packaged_task&LT; INT()&GT;
TYPEDEF提振:: packaged_task&LT; INT&GT; task_t; 提高:: shared_ptr的&LT; task_t&GT;任务=的boost :: make_shared&LT; task_t&GT;(
提高::绑定(安培; process_data,我,theTime)); 提高:: unique_future&LT; INT&GT; FUT =任务 - &GT; get_future(); pending_data.push_back(性病::移动(FUT));
io_service.post(升压::绑定(安培; task_t ::运算符(),任务));
}//循环后 - 等到所有期货交易进行评估
升压:: wait_for_all(pending_data.begin(),pending_data.end());
I have a "main" function that performs many small, independent tasks each once per time step. However, after each time step, I must wait for all of the tasks to complete before stepping forward.
I want to make the program multithreaded. I have tried implementations with the boost-offshoot threadpool, and I've tried using a vector of (shared pointers to) threads, and I've tried the asio threadpool ideas (using an io_service, establishing some work, then distributing run to the threads and posting handlers to the io_service).
All of these seem to have a lot of overhead creating and destroying threads for my "many small tasks," and I want a way, preferably using the asio tools, to instantiate one io_service, one thread_group, posting handlers to the io_service, and waiting for a single time step's work to be finished before posting more tasks. Is there a good way to do this? Here's (stripped down) code for what I have working now:
boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
io_service.reset();
boost::thread_group threads;
// scoping to destroy the work object after work is finished being assigned
{
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
for(int i = 0; i < numSmallTasks; ++i)
{
io_service.post(boost::bind(&process_data, i, theTime));
}
}
threads.join_all();
}
Here's what I had rather have (but don't know how to implement):
boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
threads.create_thread(boost::bind(&boost::asio::io_service::run,
&io_service));
}
for(int theTime = 0; theTime != totalTime; ++theTime)
{
for(int i = 0; i < numSmallTasks; ++i)
{
io_service.post(boost::bind(&process_data, i, theTime));
}
// wait here until all of these tasks are finished before looping
// **** how do I do this? *****
}
// destroy work later and join all threads later...
You may use futures for data processing and synchronize with them using boost::wait_for_all()
. This will allow you to operate in terms of parts of work done, not threads.
int process_data() {...}
// Pending futures
std::vector<boost::unique_future<int>> pending_data;
for(int i = 0; i < numSmallTasks; ++i)
{
// Create task and corresponding future
// Using shared ptr and binding operator() trick because
// packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable
// Boost 1.51 syntax
// For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
typedef boost::packaged_task<int> task_t;
boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
boost::bind(&process_data, i, theTime));
boost::unique_future<int> fut = task->get_future();
pending_data.push_back(std::move(fut));
io_service.post(boost::bind(&task_t::operator(), task));
}
// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end());
这篇关于提高:对于偶尔同步任务ASIO线程池实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!