延长线程的生命周期(C ++ 11) [英] Extend the life of threads with synchronization (C++11)

查看:159
本文介绍了延长线程的生命周期(C ++ 11)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个程序有一个函数,它接受一个指针作为arg和一个主。主要是创建 n 线程,每个线程根据传递的 arg 在不同的内存区域上运行函数。线程然后被连接,主要在区域之间执行一些数据混合,并创建 n 个新线程,这与旧的线程执行相同的操作。



为了改进程序,我想保持线程活着,删除创建它们所需的长时间。线程应该睡觉时,主要工作和通知,当他们必须再次来。以同样的方式,主线程应该等待,当线程工作,因为它与加入。



我不能结束这个强大的实现,总是陷入死锁。



简单的基准代码,任何关于如何修改此项的提示都会非常感激。

  #include< thread> 
#include< climits>

...

void myfunc(void * p){
do_something(p);
}

int main(){
void * myp [n_threads] {a_location,another_location,...}
std :: thread mythread [n_threads];
for(unsigned long int j = 0; j for(unsigned int i = 0; i mythread [i] = std :: thread(myfunc,myp [i]);
}
for(unsigned int i = 0; i mythread [i] .join();
}
mix_data(myp);
}
return 0;
}


解决方案

只有来自C ++ 11标准库的类。基本上,你创建的每个线程都有一个相关的命令队列(封装在 std :: packaged_task<> 对象)。如果队列为空,线程将只等待一个条件变量( std :: condition_variable )。



通过使用 std :: mutex std避免数据竞争: :unique_lock<> RAII包装器,主线程可以通过存储 std :: future<> 对象关联到每个提交的 std :: packaged_tast<> 并调用 wait()

下面是一个遵循这个设计的简单程序。注释应该足以解释它的作用:

  #include< thread> 
#include< iostream>
#include< sstream>
#include< future>
#include< queue>
#include< condition_variable>
#include< mutex>

//便利类型定义
using job = std :: packaged_task< void()>

//与每个线程相关的一些数据。
struct threaded_data
{
int id; //可以使用thread :: id,但这是在线程启动之前填充
std :: thread t; //线程对象
std :: queue< job>工作; //作业队列
std :: condition_variable cv; //等待线程的条件变量
std :: mutex m; //用于避免数据竞争的互斥体
bool stop = false; //设置时,此标志告诉线程它应该退出
};

//每个线程执行的线程函数
void thread_func(thread_data * pData)
{
std :: unique_lock< std :: mutex> l(pData-> m,std :: defer_lock);
while(true)
{
l.lock();

//等待队列不会为空或停止被发出信号
pData-> cv.wait(l,[pData](){
return - > stop ||!pData-> jobs.empty());
});

//停止发出信号,让我们退出线程
if(pData-> stop){return; }

//从队列中弹出一个任务...
job j = std :: move(pData-> jobs.front());
pData-> jobs.pop();

l.unlock();

//执行任务!
j();
}
}

//创建简单任务的函数
job create_task(int id,int jobNumber)
{
job j ([id,jobNumber]()
{
std :: stringstream s;
s<<Hello<< id<< jobNumber<< std :: endl;
std :: cout<<< s.str();
});

return j;
}

int main()
{
const int numThreads = 4;
const int numJobsPerThread = 10;
std :: vector< std :: future< void>>期货;

//创建所有线程(将等待作业)
thread_data threads [numThreads];
int tdi = 0;
for(auto& td:threads)
{
td.id = tdi ++;
td.t = std :: thread(thread_func,& td);
}

// =================================== ==============
//开始为每个线程分配作业...

for(auto& td:threads)
{
for(int i = 0; i {
job j = create_task(td.id,i);
futures.push_back(j.get_future());

std :: unique_lock< std :: mutex> 1(td.m);
td.jobspush(std :: move(j));
}

//通知线程有工作做...
td.cv.notify_one();
}

//等待所有任务完成...
for(auto& f:futures){f.wait(); }
futures.clear();


// ==================================== =============
//这里主线程执行某些操作...

std :: cin.get();

// ...完成!
// ========================================== =====


// =============================== ==================
//发布一些新任务...

for(auto& td:threads)
{
for(int i = 0; i {
job j = create_task(td.id,i);
futures.push_back(j.get_future());

std :: unique_lock< std :: mutex> 1(td.m);
td.jobs.push(std :: move(j));
}

//通知线程有工作做...
td.cv.notify_one();
}

//等待所有任务完成...
for(auto& f:futures){f.wait(); }
futures.clear();

//向所有线程发送停止信号并加入它们...
for(auto& td:threads)
{
std :: unique_lock< std: :mutex> 1(td.m);
td.stop = true;
td.cv.notify_one();
}

//加入所有的线程
for(auto& td:threads){td.t.join }
}


I have a program with a function which takes a pointer as arg, and a main. The main is creating n threads, each of them running the function on different memory areas depending on the passed arg. Threads are then joined, the main performs some data mixing between the area and creates n new threads which do the the same operation as the old ones.

To improve the program I would like to keep the threads alive, removing the long time necessary to create them. Threads should sleep when the main is working and notified when they have to come up again. At the same way the main should wait when threads are working as it did with join.

I cannot end up with a strong implementation of this, always falling in a deadlock.

Simple baseline code, any hints about how to modify this would be much appreciated

#include <thread>
#include <climits>

...

void myfunc(void * p) {
  do_something(p);
}

int main(){
  void * myp[n_threads] {a_location, another_location,...};
  std::thread mythread[n_threads];
  for (unsigned long int j=0; j < ULONG_MAX; j++) {
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i] = std::thread(myfunc, myp[i]);
    }
    for (unsigned int i=0; i < n_threads; i++) {
      mythread[i].join();
    }
    mix_data(myp); 
  }
  return 0;
}

解决方案

Here is a possible approach using only classes from the C++11 Standard Library. Basically, each thread you create has an associated command queue (encapsulated in std::packaged_task<> objects) which it continuously check. If the queue is empty, the thread will just wait on a condition variable (std::condition_variable).

While data races are avoided through the use of std::mutex and std::unique_lock<> RAII wrappers, the main thread can wait for a particular job to be terminated by storing the std::future<> object associated to each submitted std::packaged_tast<> and call wait() on it.

Below is a simple program that follows this design. Comments should be sufficient to explain what it does:

#include <thread>
#include <iostream>
#include <sstream>
#include <future>
#include <queue>
#include <condition_variable>
#include <mutex>

// Convenience type definition
using job = std::packaged_task<void()>;

// Some data associated to each thread.
struct thread_data
{
    int id; // Could use thread::id, but this is filled before the thread is started
    std::thread t; // The thread object
    std::queue<job> jobs; // The job queue
    std::condition_variable cv; // The condition variable to wait for threads
    std::mutex m; // Mutex used for avoiding data races
    bool stop = false; // When set, this flag tells the thread that it should exit
};

// The thread function executed by each thread
void thread_func(thread_data* pData)
{
    std::unique_lock<std::mutex> l(pData->m, std::defer_lock);
    while (true)
    {
        l.lock();

        // Wait until the queue won't be empty or stop is signaled
        pData->cv.wait(l, [pData] () {
            return (pData->stop || !pData->jobs.empty()); 
            });

        // Stop was signaled, let's exit the thread
        if (pData->stop) { return; }

        // Pop one task from the queue...
        job j = std::move(pData->jobs.front());
        pData->jobs.pop();

        l.unlock();

        // Execute the task!
        j();
    }
}

// Function that creates a simple task
job create_task(int id, int jobNumber)
{
    job j([id, jobNumber] ()
    {
        std::stringstream s;
        s << "Hello " << id << "." << jobNumber << std::endl;
        std::cout << s.str();
    });

    return j;
}

int main()
{
    const int numThreads = 4;
    const int numJobsPerThread = 10;
    std::vector<std::future<void>> futures;

    // Create all the threads (will be waiting for jobs)
    thread_data threads[numThreads];
    int tdi = 0;
    for (auto& td : threads)
    {
        td.id = tdi++;
        td.t = std::thread(thread_func, &td);
    }

    //=================================================
    // Start assigning jobs to each thread...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();


    //=================================================
    // Here the main thread does something...

    std::cin.get();

    // ...done!
    //=================================================


    //=================================================
    // Posts some new tasks...

    for (auto& td : threads)
    {
        for (int i = 0; i < numJobsPerThread; i++)
        {
            job j = create_task(td.id, i);
            futures.push_back(j.get_future());

            std::unique_lock<std::mutex> l(td.m);
            td.jobs.push(std::move(j));
        }

        // Notify the thread that there is work do to...
        td.cv.notify_one();
    }

    // Wait for all the tasks to be completed...
    for (auto& f : futures) { f.wait(); }
    futures.clear();

    // Send stop signal to all threads and join them...
    for (auto& td : threads)
    {
        std::unique_lock<std::mutex> l(td.m);
        td.stop = true;
        td.cv.notify_one();
    }

    // Join all the threads
    for (auto& td : threads) { td.t.join(); }
}

这篇关于延长线程的生命周期(C ++ 11)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆