同步任务 [英] Synchronizing tasks

查看:146
本文介绍了同步任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开发了一个使用线程池的应用程序,向其提交任务并同步它们。主线程必须等待直到来自单个循环迭代的所有提交的任务完成,然后提交另一堆任务(因为来自下一次迭代的任务对相同的数据进行操作,并且它们将彼此依赖) p>

我的问题是,最好的方法是什么?



到目前为止,是每个线程,在完成任务后,增加一个原子无符号整数。当整数等于提交的任务数量时,主线程继续工作并提交另一轮任务。



这是我的第一个多线程应用程序。
这是处理这个问题的最佳和明智的方法。



我使用的线程池类从一本优秀的书C ++并发在行动:



以下是类:

  class thread_pool 
{
std :: atomic_bool done;
thread_safe_queue< std :: function< void()>> work_queue;
std :: vector< std :: thread> threads ;
join_threads joiner;

void worker_thread()
{
while(!done)
{
std :: function< void )> task;
if(work_queue.try_pop(task))
{
task();
}
else
{
std :: this_thread :: yield();
}
}
}
public:
thread_pool():
done(false),joiner线程)
{
unsigned const thread_count = std :: thread :: hardware_concurrency();
try
{
for(unsigned i = 0; i< thread_count; ++ i)
{
threads.push_back(
std :: thread (& thread_pool :: worker_thread,this));
}
}
catch(...)
{
done = true;
throw;
}
}

〜thread_pool()
{
done = true;
}

template< typename FunctionType>
void submit(FunctionType f)
{
work_queue.push(std :: function< void()>(f));
}
};

template< typename T>
class threadsafe_queue
{
private:
mutable std :: mutex mut;
std :: queue< T> data_queue;
std :: condition_variable data_cond;
public:
threadafe_queue()
{}

void push(T new_value)
{
std :: lock_guard< std ::互斥> lk(mut);
data_queue.push(std :: move(new_value));
database_cond.notify_one();
}

void wait_and_pop(T& value)
{
std :: unique_lock< std :: mutex> lk(mut);
data_cond.wait(lk,[this] {return!data_queue.empty();});
value = std :: move(data_queue.front());
data_queue.pop();
}

std :: shared_ptr< T> wait_and_pop()
{
std :: unique_lock< std :: mutex> lk(mut);
data_cond.wait(lk,[this] {return!data_queue.empty();});
std :: shared_ptr< T> res(
std :: make_shared< T>(std :: move(data_queue.front())));
data_queue.pop();
return res;
}

bool try_pop(T& value)
{
std :: lock_guard< std :: mutex> lk(mut);
if(data_queue.empty())
return false;
value = std :: move(data_queue.front());
data_queue.pop();
}

std :: shared_ptr< T> try_pop()
{
std :: lock_guard< std :: mutex> lk(mut);
if(data_queue.empty())
return std :: shared_ptr< T>();
std :: shared_ptr< T> res(
std :: make_shared< T>(std :: move(data_queue.front())));
data_queue.pop();
return res;
}

bool empty()const
{
std :: lock_guard< std :: mutex> lk(mut);
return data_queue.empty();
}
};

main()函数:

  std :: condition_variable waitForThreads; 
std :: mutex mut;

std :: atomic< unsigned> doneCount = 0;

unsigned threadCount = 4; //样本用于测试的并发线程计数

void synchronizeWork()
{
doneCount ++;
if(doneCount.load()== threadCount)
{
doneCount = 0;
std :: lock_guard< std :: mutex> lock(mut);
waitForThreads.notify_one();
}
}

void Task_A()
{
std :: cout< 任务A,线程id:< std :: this_thread :: get_id()<< std :: endl;
std :: this_thread :: sleep_for(std :: chrono :: milliseconds(3000));
synchronizeWork();
}

int main()
{
unsigned const thread_count = std :: thread :: hardware_concurrency();
thread_pool threadPool;

for(int i = 0; i <1000; ++ i)
{
for(unsigned j = 0; j threadPool.submit(Task_A);

//下面是我同步任务的方式:

{
std :: unique_lock< std :: mutex> lock(mut);
waitForThreads.wait(lock);
}

}


解决方案

我不熟悉你正在使用的线程池类。



不使用这样的类,通常的做法如下:

  std :: cout< 生成3个线程... \\\
;
std :: thread t1(pause_thread,1);
std :: thread t2(pause_thread,2);
std :: thread t3(pause_thread,3);
std :: cout<< 完成生成线程。现在等待他们加入:\\\
;
t1.join();
t2.join();
t3.join();
std :: cout<< 所有线程加入!\\\
;



我想象任何可行的线程池类将允许你同样的事情, ,通过给你一个metho阻塞,直到所有的线程都完成。我建议您仔细检查文档。


I am develeping an application that uses a threadpool, submits tasks to it and synchronizes them. The main thread has to wait until all the submitted tasks from a single loop iteration finish and then it submits another bunch of tasks (because the tasks from the next iteration operate on the same data and they will be dependent on one another).

My question is, what is the best way to do that?

So far, what I have come up with is that each thread, after finishing a task, increments an atomic unsigned integer. When the integer equals the number of submitted tasks, the main thread continues its work and submits another round of tasks.

This is my first multithreaded application. Is this an optimal and sensible way of dealing with this problem.

I'm using a threadpool class copied from an excellent book "C++ Concurrency in Action: by Anthony Williams.

Here are the classes:

class thread_pool
{
    std::atomic_bool done;
    thread_safe_queue<std::function<void()> > work_queue;
    std::vector<std::thread> threads;
    join_threads joiner;

    void worker_thread()
    {
        while(!done)
        {
            std::function<void()> task;
            if(work_queue.try_pop(task))
            {
                task();
            }
            else
            {
                std::this_thread::yield();
            }
        }
    }
public:
    thread_pool():
        done(false),joiner(threads)
    {
        unsigned const thread_count=std::thread::hardware_concurrency();
        try
        {
            for(unsigned i=0;i<thread_count;++i)
            {
                threads.push_back(
                    std::thread(&thread_pool::worker_thread,this));
            }
        }
        catch(...)
        {
            done=true;
            throw;
        }
    }

    ~thread_pool()
    {
        done=true;
    }

    template<typename FunctionType>
    void submit(FunctionType f)
    {
        work_queue.push(std::function<void()>(f));
    }
};

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(std::move(new_value));
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty(); });
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return false;
        value = std::move(data_queue.front());
        data_queue.pop();
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(
            std::make_shared<T>(std::move(data_queue.front())));
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

The main() function:

std::condition_variable waitForThreads;
std::mutex mut;

std::atomic<unsigned> doneCount = 0;

unsigned threadCount = 4; // sample concurrent thread count that I use for testing

 void synchronizeWork()
   {
    doneCount++;
    if (doneCount.load() == threadCount)
    {
        doneCount = 0;
        std::lock_guard<std::mutex> lock(mut);
        waitForThreads.notify_one();
    }
   }

   void Task_A()
   {
    std::cout << "Task A, thread id: " << std::this_thread::get_id() << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(3000));
    synchronizeWork();
   }

int main()
{   
    unsigned const thread_count = std::thread::hardware_concurrency();
    thread_pool threadPool;

    for (int i = 0; i < 1000; ++i)
    {
        for (unsigned j = 0; j < thread_count; j++)
            threadPool.submit(Task_A);

// Below is my way of synchronizing the tasks

        {
            std::unique_lock<std::mutex> lock(mut);
            waitForThreads.wait(lock);
        }

    }

解决方案

I am not familiar with the threadpool class you are using.

Without using such a class, the usual way to do this looks like this:

  std::cout << "Spawning 3 threads...\n";
  std::thread t1 (pause_thread,1);
  std::thread t2 (pause_thread,2);
  std::thread t3 (pause_thread,3);
  std::cout << "Done spawning threads. Now waiting for them to join:\n";
  t1.join();
  t2.join();
  t3.join();
  std::cout << "All threads joined!\n";

I would imagine that any decent threadpool class would allow you to the same sort of thing, even more simply, by giving you a metho to block until all the threads have completed. I suggest you double check the documentation.

这篇关于同步任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆