提高:对于偶尔同步任务ASIO线程池实现 [英] boost:asio thread pool implementation for occasionally synchronized tasks

查看:182
本文介绍了提高:对于偶尔同步任务ASIO线程池实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主功能,每个时间步骤执行许多小的,独立的任务各一次。然而,每次步骤后,我必须等待所有的任务挺身而出之前完成。

我想使程序的多线程。我曾尝试与升压分支线程池的实现,我已经使用(共享指针)线程矢量尝试过了,我已经试过了ASIO线程池的想法(使用io_service对象,建立了一些工作,然后分发运行到线程和张贴处理程序io_service对象)。

所有这些似乎有大量的开销创建和销毁线程我很多小的任务,我想pferably使用ASIO工具,从某种意义上说,$ P $来实现一个记录io_service对象,一是thread_group,张贴处理程序到io_service对象,并等待一个时间步的工作,发布更多的任务前完成。有没有做到这一点的好办法?这里的(精简)code为我现在的工作:

 的boost ::支持ASIO :: io_service对象io_service对象;
对于(INT theTime = 0; theTime = TOTALTIME;!++ theTime)
{
    io_service.reset();
    提高:: thread_group线程;
    //工作完成中分配后,作用域销毁工作对象
    {
        提高:: ASIO :: io_service对象::工作工作(io_service对象);
        的for(int i = 0; I< maxNumThreads ++ I)
        {
            threads.create_thread(提高::绑定(安培;提高:: ASIO :: io_service对象::运行,
                &放大器; io_service对象));
        }        的for(int i = 0; I< numSmallTask​​s ++ I)
        {
            io_service.post(升压::绑定(安培; process_data,我,theTime));
        }
    }
    threads.join_all();
}

这就是我宁愿有(但不知道如何实现):

 的boost ::支持ASIO :: io_service对象io_service对象;
提高:: thread_group线程;
提高:: ASIO :: io_service对象::工作工作(io_service对象);
的for(int i = 0; I< maxNumThreads ++ I)
{
    threads.create_thread(提高::绑定(安培;提高:: ASIO :: io_service对象::运行,
         &放大器; io_service对象));
}对于(INT theTime = 0; theTime = TOTALTIME;!++ theTime)
{
    的for(int i = 0; I< numSmallTask​​s ++ I)
    {
        io_service.post(升压::绑定(安培; process_data,我,theTime));
    }
    //在这里等着,直到所有的这些任务循环之前完成
    // ****我该怎么做呢? *****
}
//后来销毁工作,后来加入的所有主题...


解决方案

您可以使用<一个href=\"http://www.boost.org/doc/libs/1_51_0/doc/html/thread/synchronization.html#thread.synchronization.futures\"相对=nofollow>期货中进行数据处理,并使用与他们同步<一个href=\"http://www.boost.org/doc/libs/1_51_0/doc/html/thread/synchronization.html#thread.synchronization.futures.reference.wait_for_all\"相对=nofollow> 的boost :: wait_for_all() 。这将允许你在做的部分工作,而不是线程方面工作。

  INT process_data(){...}//待期货
的std ::矢量&lt;提高:: unique_future&LT; INT&GT;&GT; pending_data;的for(int i = 0; I&LT; numSmallTask​​s ++ I)
{
   //创建任务和相应的未来
   //使用共享PTR和绑定运营商()技巧,因为
   // packaged_task是不可复制的,但ASIO :: io_service对象::岗位要求参数可拷贝   //提升1.51语法
   //升压1.53+或C ++ 11的std :: packaged_task应提振:: packaged_task&LT; INT()&GT;
   TYPEDEF提振:: packaged_task&LT; INT&GT; task_t;   提高:: shared_ptr的&LT; task_t&GT;任务=的boost :: make_shared&LT; task_t&GT;(
      提高::绑定(安培; process_data,我,theTime));   提高:: unique_future&LT; INT&GT; FUT =任务 - &GT; get_future();   pending_data.push_back(性病::移动(FUT));
   io_service.post(升压::绑定(安培; task_t ::运算符(),任务));
}//循环后 - 等到所有期货交易进行评估
升压:: wait_for_all(pending_data.begin(),pending_data.end());

I have a "main" function that performs many small, independent tasks each once per time step. However, after each time step, I must wait for all of the tasks to complete before stepping forward.

I want to make the program multithreaded. I have tried implementations with the boost-offshoot threadpool, and I've tried using a vector of (shared pointers to) threads, and I've tried the asio threadpool ideas (using an io_service, establishing some work, then distributing run to the threads and posting handlers to the io_service).

All of these seem to have a lot of overhead creating and destroying threads for my "many small tasks," and I want a way, preferably using the asio tools, to instantiate one io_service, one thread_group, posting handlers to the io_service, and waiting for a single time step's work to be finished before posting more tasks. Is there a good way to do this? Here's (stripped down) code for what I have working now:

boost::asio::io_service io_service;
for(int theTime = 0; theTime != totalTime; ++theTime)
{
    io_service.reset();
    boost::thread_group threads;
    // scoping to destroy the work object after work is finished being assigned
    {
        boost::asio::io_service::work work(io_service);
        for (int i = 0; i < maxNumThreads; ++i)
        {
            threads.create_thread(boost::bind(&boost::asio::io_service::run,
                &io_service));
        }

        for(int i = 0; i < numSmallTasks; ++i)
        {
            io_service.post(boost::bind(&process_data, i, theTime));
        }
    }
    threads.join_all(); 
}

Here's what I had rather have (but don't know how to implement):

boost::asio::io_service io_service;
boost::thread_group threads;
boost::asio::io_service::work work(io_service);
for (int i = 0; i < maxNumThreads; ++i)
{
    threads.create_thread(boost::bind(&boost::asio::io_service::run,
         &io_service));
}

for(int theTime = 0; theTime != totalTime; ++theTime)
{
    for(int i = 0; i < numSmallTasks; ++i)
    {
        io_service.post(boost::bind(&process_data, i, theTime));
    }
    // wait here until all of these tasks are finished before looping 
    // **** how do I do this? *****
}
// destroy work later and join all threads later...

解决方案

You may use futures for data processing and synchronize with them using boost::wait_for_all(). This will allow you to operate in terms of parts of work done, not threads.

int process_data() {...}

// Pending futures
std::vector<boost::unique_future<int>> pending_data;

for(int i = 0; i < numSmallTasks; ++i)
{
   // Create task and corresponding future
   // Using shared ptr and binding operator() trick because
   // packaged_task is non-copyable, but asio::io_service::post requires argument to be copyable

   // Boost 1.51 syntax
   // For Boost 1.53+ or C++11 std::packaged_task shall be boost::packaged_task<int()>
   typedef boost::packaged_task<int> task_t;

   boost::shared_ptr<task_t> task = boost::make_shared<task_t>(
      boost::bind(&process_data, i, theTime));

   boost::unique_future<int> fut = task->get_future();

   pending_data.push_back(std::move(fut));
   io_service.post(boost::bind(&task_t::operator(), task));    
}

// After loop - wait until all futures are evaluated
boost::wait_for_all(pending_data.begin(), pending_data.end()); 

这篇关于提高:对于偶尔同步任务ASIO线程池实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆