std :: condition_variable - 等待几个线程通知观察者 [英] std::condition_variable - Wait for several threads to notify observer

查看:1935
本文介绍了std :: condition_variable - 等待几个线程通知观察者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题看起来像这样:



我有一个观察者持有std :: condition_variable和std :: mutex,我的工作线程对象有一个指向观察者的指针。每次工作线程完成它的工作,它调用m_Observer-> NotifyOne(),然后调用condition_variable的notify_one()函数。现在我想做的是,启动一束工作线程,每个具有不同的工作和不同的(独立)数据,并等待所有的他们发信号(使用m_Observer-> NotifyOne())观察员,使我能够基于所有工作线程的结果继续工作。



我的观察者看起来像这样:

  class IAsyncObserver 
{
private:
std :: condition_variable m_ObserverCV;
bool m_bNotified;
std :: mutex m_Mutex;

public:
IAsyncObserver()
{
m_bNotified = false;
}

〜IAsyncObserver()
{
/ * m_bNotified = true;
m_ObserverCV.notify_all(); * /
}

void NotifyOne()
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}

void NotifyAll()
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}

void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
{
uint32_t uNotifyCount = 0;
while(uNotifyCount< _uNumOfNotifications)
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
m_bNotified = false;
m_ObserverCV.wait(Lock);

if(m_bNotified)
++ uNotifyCount;
}
}

}; // IAsyncObserver

其中_uNumOfNotifications是我想等待的工作线程数。



现在每个工作线程都应该运行一个模拟函数,对一个时间步/数据垃圾进行实际工作,然后暂停/等待,直到观察者通知工人继续。



工人的线程函数可能如下所示:

  do {
//暂停模拟
while(m_PauseSimulation.load())
{
std :: unique_lock< std :: mutex> wait(m_WaitMutex);
m_CV.wait(wait);
if(m_RunSimulation.load()== false)
{
SignalObserver();
return;
}
}

//模拟时锁定数据
{
std :: lock_guard< std :: mutex>锁(m_LockMutex);

//更新模拟
模拟(static_cast< float>(m_fDeltaTime));

m_PauseSimulation.store(true);
}

//通知物理经理工作在这里
SignalObserver();

} while(m_RunSimulation.load());

SignalObserver()只是调用m_Observer-> NotifyOne()。



现在的问题是,一段时间后线程在某处遇到死锁,观察者不通知他们继续下一个时间步。问题可能在WaitForNotifications()函数的某处,但我不确定。 Atm我只有一个工作线程,所以uNumOfNotifications = 1,但它仍然运行到等待在m_ObserverCV.wait(Lock)和m_CV.wait(等待)的问题,我甚至不确定它是否真的是一个死锁或东西因为我试图从几个线程访问它。



现在我想引用Ned Flanders的父亲:我们什么都没试过,想法!



感谢您的帮助。



法比安



编辑:



感谢所有有用的信息&建议。我最终实现了Michael的第二个想法,因为我没有找到任何关于std :: barrier。所以这里是我做的:

  class IAsyncObserver 
{
private:
std: :condition_variable m_ObserverCV;
bool m_bNotified;
std :: mutex m_Mutex;

uint32_t m_uNumOfNotifications;
uint32_t m_uNotificationCount;

public:
IAsyncObserver()
{
m_bNotified = false;
m_uNumOfNotifications = 0;
m_uNonificationCount = 0;
}

〜IAsyncObserver()
{
/ * m_bNotified = true;
m_ObserverCV.notify_all(); * /
}

void SetBarrier(uint32_t _uNumOfNotifications = 1)
{
m_uNumOfNotifications = _uNumOfNotifications;
}

void NotifyBarrier()
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
if(++ m_uNotificationCount> = m_uNumOfNotifications)
{
m_bNotified = true;
m_ObserverCV.notify_one();
}
}

void WaitForNotifications()
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
while(m_bNotified == false)
{
m_ObserverCV.wait(Lock);
}
m_uNotificationCount = 0;
}

void NotifyOne()
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_one();
}

void NotifyAll()
{
std :: unique_lock< std :: mutex>锁定(m_Mutex);
m_bNotified = true;
m_ObserverCV.notify_all();
}

}; // IAsyncObserver

在我的main函数中:MassSpringSystem和RigidBodySystem是我的工作者atm

  // update systems here:
{
SetBarrier(m_uTotalNotifyCount);

{//启动MassSpringSystems
std :: lock_guard< std :: mutex>锁(m_LockMutex);
for(std :: shared_ptr< MassSpringSystem> MSS:m_MassSpringSystems)
{
MSS-> SetDeltaTime(fDeltaTime);
MSS-> Continue();
}
}

//注意这个系统直接在m_OctreeEntities上工作!
{// start RigidBodySystems
m_RigidBodySystem.SetDeltaTime(fDeltaTime);
m_RigidBodySystem.AddData(m_RigidBodies);
m_RigidBodySystem.Continue();
}

//等待所有系统完成 - >直到他们调用SignalObserver
WaitForNotifications();
}

而且在线程函数中的工人就像上面一样,但这次SignalObserver调用NotifyBarrier()



一切正常。一个简单而强大的解决方案,谢谢!

解决方案

您尝试使用条件变量, - 在这种情况下,您假设您可以计数通知。你不能。您可能会失去通知,并且您计算标准允许的虚假唤醒。



相反,您应该使用在互斥和信令下递增的计数器条件变量只有当计数器达到工人数量时。 (在每个工人的最后这样做)。主线程保持对条件变量进行休眠,直到计数器达到期望值。 (当然,计数器的验证必须完成,持有您用于增量的互斥量)。就我所知,将互斥计数器替换为一个原子(没有互斥它)似乎是不可能的,因为你不能原子地检查计数器和睡眠在condvar,所以你会得到一个竞争条件没有mutexing计数器。 / p>

从boost线程中获知的另一个同步原语是障碍,它没有进入C ++ 11。你构造一个障碍,并传递它的工作线程数量加一作为构造函数参数。所有工作线程都应该在它们结束时等待条件变量,主线程应该在构建工作线程后等待。所有线程将阻塞在该障碍上,直到所有工作线程和主线程阻塞,并且将在那一时刻释放。所以如果主线程被释放,你知道所有的工人都完成了。这有一个问题:没有工作线程完成(并释放相关的管理资源),直到所有工作线程完成,这可能是或可能不是一个问题。 此问题介绍了使用C ++ 11线程设置的boost :: barrier的实现。 p>

my problem looks like this:

I've got a observer which holds a std::condition_variable and a std::mutex, my worker thread objects have a pointer to the observer. Each time a worker thread finished its job it calls m_Observer->NotifyOne() which then calls the notify_one() function of the condition_variable. Now what i want to do is, start a bunch of worker threads each with a different job and different (independant) data and wait for all of them to signal (using m_Observer->NotifyOne()) the observer so that I'm able to continue work based on the results of all the worker threads.

My observer looks like this:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

    void WaitForNotifications(uint32_t _uNumOfNotifications = 1)
    {
        uint32_t uNotifyCount = 0;
        while (uNotifyCount < _uNumOfNotifications)
        {
            std::unique_lock<std::mutex> Lock(m_Mutex);
            m_bNotified = false;
            m_ObserverCV.wait(Lock);

            if (m_bNotified)
                ++uNotifyCount;
        }
    }

}; // IAsyncObserver

where _uNumOfNotifications is the number of worker threads i want to wait on.

Now each worker thread is supposed to run a simulation function which does the actual work for one timestep/data junk and then pause / wait until the observer notifies the worker to continue.

The thread function of a worker might look like this:

do{
    //suspend simulation
    while (m_PauseSimulation.load())
    {
        std::unique_lock<std::mutex> wait(m_WaitMutex);
        m_CV.wait(wait);
        if (m_RunSimulation.load() == false)
        {
            SignalObserver();
            return;
        }
    }

    //lock the data while simulating
    {
        std::lock_guard<std::mutex> lock(m_LockMutex);

        //update simulation 
        Simulate(static_cast<float>(m_fDeltaTime));

        m_PauseSimulation.store(true);
    }

    //notify the physics manager that work is done here
    SignalObserver();       

} while (m_RunSimulation.load());

SignalObserver() just calls m_Observer->NotifyOne().

Now the problem is that after some time the threads run into a deadlock somewhere and the observer does not notify them to continue with the next time step. The problem probably is somewhere in the WaitForNotifications() function, but I'm not sure. Atm I only have one worker thread so uNumOfNotifications = 1, but it still runs into the problem where it waits at m_ObserverCV.wait(Lock) and m_CV.wait(wait), I'm not even sure if its really a deadlock or something with the condition_variable because i try to access it from several threads.

At this point I'd like to quote Ned Flanders father: "We tried nothing and are all out of ideas!"

Thanks for your help. Ever tip is appreciated.

Fabian

EDIT:

Thanks for all the helpful info & suggestions. I ended up implementing the second idea of Michael since i didnt find anything about std::barriers. So here is what i did:

class IAsyncObserver
{
private:
    std::condition_variable m_ObserverCV;
    bool m_bNotified;
    std::mutex m_Mutex;

    uint32_t m_uNumOfNotifications;
    uint32_t m_uNotificationCount;

public:
    IAsyncObserver()
    {
        m_bNotified = false;
        m_uNumOfNotifications = 0;
        m_uNotificationCount = 0;
    }

    ~IAsyncObserver()
    {
        /*m_bNotified = true;
        m_ObserverCV.notify_all();*/
    }

    void SetBarrier(uint32_t _uNumOfNotifications = 1)
    {
        m_uNumOfNotifications = _uNumOfNotifications;
    }

    void NotifyBarrier()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        if (++m_uNotificationCount >= m_uNumOfNotifications)
        {
            m_bNotified = true;
            m_ObserverCV.notify_one();
        }
    }

    void WaitForNotifications()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        while (m_bNotified == false)
        {
            m_ObserverCV.wait(Lock);
        }
        m_uNotificationCount = 0;
    }

    void NotifyOne()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_one();
    }

    void NotifyAll()
    {
        std::unique_lock<std::mutex> Lock(m_Mutex);
        m_bNotified = true;
        m_ObserverCV.notify_all();
    }

}; // IAsyncObserver

In my "main" function: where MassSpringSystem and RigidBodySystem are my workers atm

    //update systems here:
    {
        SetBarrier(m_uTotalNotifyCount);

        {   //start MassSpringSystems
            std::lock_guard<std::mutex> lock(m_LockMutex);
            for (std::shared_ptr<MassSpringSystem> MSS : m_MassSpringSystems)
            {
                MSS->SetDeltaTime(fDeltaTime);
                MSS->Continue();
            }
        }

        //ATTENTION this system works directly on the m_OctreeEntities!
        {   //start RigidBodySystems
            m_RigidBodySystem.SetDeltaTime(fDeltaTime);
            m_RigidBodySystem.AddData(m_RigidBodies);
            m_RigidBodySystem.Continue();
        }

        //wait for all systems to finish -> till they call SignalObserver
        WaitForNotifications();
    }

And in the thread function of the workers just like above, but this time SignalObserver calls NotifyBarrier()

Everything works fine now. A simple, yet powerful solution, Thanks!

解决方案

You try to use condition variables in a way they are not meant to be used - in this case, you assume that you can count notifications. You can't. You may lose notifications by that, and you are counting spurious wake-ups that are allowed by the standard.

Instead, you should use a counter incremented under a mutex and signalling the condition variable only when the counter reached the number of workers. (Do this in each worker at the end). The main thread keeps sleeping on the condition variable until the counter reaches the expected value. (Of course, verification of the counter has to be done holding the mutex you use for incrementing, too). As far as I can see, replacing the mutexed counter by an atomic (without mutexing it) seems impossible, as you can't atomically check the counter and sleep on the condvar, so you will get a race condition without mutexing the counter.

Another synchronization primitive known from boost threads is the barrier, which did not get into C++11. You construct a barrier, and pass it the number of worker threads plus one as constructor argument. All worker threads should wait for the condition variable at their end and the main thread should wait after constructing the workers. All threads will block on that barrier, until all worker threads and the main thread are blocking, and will be released at that moment. So if the main thread is released, you know that all workers finished. This has one problem though: No worker thread is finished (and freeing associated management resources) until all worker threads are finshed, which may or may not be a problem for you. This question presents an implementation of boost::barrier using C++11 threading facilities.

这篇关于std :: condition_variable - 等待几个线程通知观察者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆