std :: promise和std :: future的不明显的生命周期问题 [英] Non-obvious lifetime issue with std::promise and std::future

查看:1643
本文介绍了std :: promise和std :: future的不明显的生命周期问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此问题与上一个问题非常相似: pthread_once()中的种族条件?



这本质上是相同的问题 - std :: promise 调用 promise :: set_value (即:在相关联的未来被标记之后,但在 pthread_once 之前执行) / p>

所以我知道我的用法有这个问题,因此我不能这样使用它。但是,我认为这是不明显的。



我在下面提供一个范例: / p>


  • 我有一个线程( dispatcher )在队列中旋转,

  • 我有一个名为 synchronous_job的工具类( std :: function $ std :: promise $ c>和 std :: future synchronous_job 的成员 - future 设置,阻塞的调用线程继续,这导致 synchronous_job 弹出堆栈并被销毁。

  • 不幸的是,此时 dispatcher 是上下文切换,而 promise :: set_value ; 未来被标记,但是对 pthread_once 的调用尚未执行,并且pthread堆栈以某种方式被破坏,意味着下次时间:死锁



我会期望调用 promise :: set_value 为原子;在它标记未来之后需要做更多的工作的事实将不可避免地导致这种类型的问题。



所以我的问题是:如何使用 std :: promise std :: future来实现这种同步



@Jonathan Wakely,你可以在内部使用一些RAII风格的类吗?在它标记 future 之后,在其析构函数中设置 condition_variable ?这意味着即使 promise 在调用 set_value 的过程中被销毁,设置的额外工作条件变量将正确完成。只是一个想法,不知道是否可以使用它...



下面是一个完整的工作示例,下面是死锁应用的堆栈跟踪:

  #include< iostream> 
#include< thread>
#include< future>
#include< queue>

struct dispatcher
{
dispatcher()
{
_thread = std :: move(std :: thread(& dispatcher :: loop,这个));
}
void post(std :: function< void()> job)
{
std :: unique_lock< std :: mutex> l(_mtx);
_jobs.push(job);
_cnd.notify_one();
}
private:
void loop()
{
for(;;)
{
std :: function< void >工作;
{
std :: unique_lock< std :: mutex> l(_mtx);
while(_jobs.empty())
_cnd.wait(l);
job.swap(_jobs.front());
_jobs.pop();
}
job();
}
}
std :: thread _thread;
std :: mutex _mtx;
std :: condition_variable _cnd;
std :: queue< std :: function< void()>> _工作;
};
// -------------------------------------------- -----------------

struct synchronous_job
{
synchronous_job(std :: function< void()> dispatcher& d)
:_job(job)
,_d(d)
,_f(_p.get_future())
{
}
run()
{
_d.post(std :: bind(& synchronous_job :: cb,this));
_f.wait();
}
private:
void cb()
{
_job();
_p.set_value();
}
std :: function< void()> _工作;
dispatcher& _d;
std :: promise< void> _p;
std :: future< void> _F;
};
// -------------------------------------------- -----------------

struct test
{
test()
:_count(0)
{
}
void run()
{
synchronous_job job(std :: bind(& test :: cb,this),_d);
job.run();
}
private:
void cb()
{
std :: cout< ++ _ count<< std :: endl;
}
int _count;
dispatcher _d;
};
// -------------------------------------------- -----------------

int main()
{
test t;
for(;;)
{
t.run();
}
}

死锁应用程序的堆栈跟踪:



主题1(主线程)

  0 0x00007fa112ed750c在调用pthread_cond_wait @@从/lib64/libpthread.so.0 
#GLIBC_2.3.2()1 0x00007fa112a308ec在__gthread_cond_wait(__mutex =<优化掉了>中__cond =<优化掉了>)在/主机名/ tmp / syddev / Build / gcc-4.6.2 / gcc-build / x86_64-unknown-linux-gnu / libstdc ++ - v3 / include / x86_64-unknown-linux-gnu / bits / gthr-default.h:846
#2 std :: condition_variable :: wait(this =< optimized out>,__lock = ...)at ../../../../libstdc++-v3/src/condition_variable.cc:56
#3 0x00000000004291d9 in std :: condition_variable :: wait< std :: __ future_base :: _ St​​ate_base :: wait():: {lambda()#1}>(std :: unique_lock< std :: mutex>& ;,std :: __ future_base :: _ St​​ate_base :: wait():: {lambda()#1})(this = 0x78e050,__lock = ...,__p = ...)at / hostname / sdk / gcc470 / suse11 /x86_64/include/c++/4.7.0/condition_variable:93
#4 0x00000000004281a8 in std :: __ future_base :: _ St​​ate_base :: wait(this = 0x78e018)at / hostname / sdk / gcc470 / suse11 / x86_64 / include /c++/4.7.0/future:331
#5 0x000000000042a2d6 in std :: __ basic_future< void> :: wait(this = 0x7fff0ae515c0)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7 .0 / future:576
#6 0x0000000000428dd8 in synchronous_job :: run(this = 0x7fff0ae51580)at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:60
#7 0x0000000000428f97 in test :: run(this = 0x7fff0ae51660)at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:83
#8 0x0000000000427ad6 in main()at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:99

主题2(分派器主题)

 #0 0x00007fa112ed8b5b在调用pthread_once()从/lib64/libpthread.so.0 
#1 0x0000000000427946在__gthread_once(__once = 0x78e084,__func = 0x4272d0< __ once_proxy @ PLT>)在/主机名/ SDK / gcc470 / suse11 / x86_64的/ bin /../ lib / gcc / x86_64-unknown-linux-gnu / 4.7.0 /../../../../ include / c ++ / 4.7.0 / x86_64-unknown-linux-gnu / bits /gthr-default.h:718
#2 0x000000000042948b in std :: call_once< void(std :: __ future_base :: _ St​​ate_base :: *)(std :: function< std :: unique_ptr< std :: _ future_base: :_Result_base,std :: __ future_base :: _ Result_base :: _ Deleter> ()>&放;,布尔&安培),性病:: __ future_base :: _ St​​ate_base * const的,性病::的reference_wrapper<的std ::功能<的std ::的unique_ptr<的std :: __ future_base :: _ Result_base,性病:: __ future_base :: _Result_base :: _ Deleter> ()> >,std :: reference_wrapper< bool> >(std :: once_flag& amp; void(std :: __ future_base :: _State_base :: *&&)(std :: function< std :: unique_ptr< std :: _ future_base :: _ Result_base,std :: _ future_base: :_Result_base :: _的删除>()>&放;,布尔&安培),性病:: __ future_base :: _ St​​ate_base * const的&放大器;&放;,的std ::的reference_wrapper<的std ::功能<的std ::的unique_ptr<的std :: __ future_base: :_Result_base,std :: __ future_base :: _ Result_base :: _ Deleter>()>&&&&&&&&&&)(__once = ...,__f =
@ 0x7fa111ff6be0:(无效(STD :: __ future_base :: _ St​​ate_base :: *)(STD :: __ future_base :: _ St​​ate_base * const的,性病::功能<的std ::的unique_ptr<的std :: __ future_base :: _ Result_base,性病:: __ future_base :: _ Result_base :: _Deleter>()>& bool&))0x42848a< std :: __ future_base :: _ St​​ate_base :: _ M_do_set(std :: function< std :: unique_ptr< std :: __ future_base :: _ Result_base ,std :: __ future_base :: _ Result_base :: _ Deleter>()&& bool&)>)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/mutex:819
#3 0x000000000042827d中的std :: __ future_base :: _ St​​ate_base :: _ M_set_result(STD ::功能<的std ::的unique_ptr<的std :: __ future_base :: _ Result_base,性病:: __ future_base :: _ Result_base :: _的删除> ()>,bool)(this = 0x78e018,__res = ...,__ignore_failure = false)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:362
# 4 0x00000000004288d5中的std ::承诺<无效> :: SET_VALUE(此= 0x7fff0ae515a8)在synchronous_job /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:1206
#5 0x0000000000428e2a :: cb(this = 0x7fff0ae51580)at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:66
#6 0x000000000042df53 in std :: _ Mem_fn< void synchronized_job :: *)()> :: operator()(this = 0x78c6e0,__object = 0x7fff0ae51580)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:554
#7 0x000000000042d77c in std :: _ Bind< std :: _ Mem_fn< void(synchronous_job :: *)()> (synchronization_job *)> :: __ call< void,,0ul>(std :: tuple<&&&&&&&&&&> /sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1156
#8 0x000000000042cb28 in std :: _ Bind< std :: _ Mem_fn< void(synchronous_job :: *)()> (synchronous_job *)> :: operator()< void>()(this = 0x78c6e0)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1215
#9 0x000000000042b772 in std :: _ Function_handler< void(),std :: _ Bind< std :: _ Mem_fn< void(synchronous_job :: *)()> (synchronous_job *)> > :: _ M_invoke(std :: _ Any_data const&)(__functor = ...)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1926
#10 0x0000000000429f2c在std :: function< void()> :: operator()()const(this = 0x7fa111ff6da0)at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:2311
#11 0x0000000000428c3c in dispatcher :: loop(this = 0x7fff0ae51668)at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:39


解决方案

std :: promise 任何其他对象:您只能从一个线程一次访问它。在这种情况下,您呼叫 SET_VALUE()并没有足够的同步单独的线程销毁对象:在规范中无处它说 SET_VALUE 在准备好未来后不会触及 promise 然而,由于这个未来用于单步同步,你不需要这样做:在 run()中创建promise / future对, ,并将该promise传递给线程:

  struct synchronous_job 
{
synchronous_job(std :: function< void()> job,dispatcher& d)
:_job(job)
,_d(d)
{
} $ b b void run(){
std :: promise< void> p;
std :: future< void> f = p.get_future();

_d.post(
[&] {
cb(std :: move(p));
}

f.wait();
}
private:
void cb(std :: promise< void> p)
{
_job
p.set_value();
}
std :: function< void()> _工作;
dispatcher& _d;
};


This question is very similar to a previous one here: race-condition in pthread_once()?

It is essentially the same issue - the lifetime of a std::promise ending during a call to promise::set_value (ie: after the associated future has been flagged, but before pthread_once has executed)

So I know that my usage has this issue, and that I therefore cannot use it in this way. However, I think this is non-obvious. (In the wise words of Scott Meyer: Make Interfaces Easy to Use Correctly and Hard to Use Incorrectly)

I present an exemplar below:

  • I have a thread (dispatcher) which spins on a queue, popping a 'job' (a std::function) and executing it.
  • I have a utility class called synchronous_job which blocks the calling thread until the 'job' has been executed on the dispatcher thread
  • The std::promise and std::future are members of synchronous_job - once the future is set, the blocked calling thread continues, which results in the synchronous_job popping off the stack and being destructed.
  • Unfortunately, at this time the dispatcher was context switched whilst inside promise::set_value; the future is flagged, but the call to pthread_once hasn't executed, and the pthread stack is somehow corrupted, meaning next time around: deadlock

I would expect a call to promise::set_value to be atomic; the fact that it needs to do more work after it has flagged the future will inevitably lead to this kind of issue when using these classes in this manner.

So my question is: How to achieve this kind of synchronisation using std::promise and std::future, keeping their lifetime associated with the class which provides this synchronisation mechanism?

@Jonathan Wakely, could you perhaps use some RAII-style class internally which sets the condition_variable in its destructor after it flags the future? This would mean that even if the promise is destructed in the midst of a call to set_value, the additional work of setting the condition variable would complete correctly. Just an idea, not sure if you can use it...

A full working example below, and the stack trace of the deadlocked app after:

#include <iostream>
#include <thread>
#include <future>
#include <queue>

struct dispatcher
{
    dispatcher()
    {
        _thread = std::move(std::thread(&dispatcher::loop, this));
    }
    void post(std::function<void()> job)
    {
        std::unique_lock<std::mutex> l(_mtx);
        _jobs.push(job);
        _cnd.notify_one();
    }
private:
    void loop()
    {
        for (;;)
        {
            std::function<void()> job;
            {
                std::unique_lock<std::mutex> l(_mtx);
                while (_jobs.empty())
                    _cnd.wait(l);
                job.swap(_jobs.front());
                _jobs.pop();
            }
            job();
        }
    }
    std::thread                       _thread;
    std::mutex                        _mtx;
    std::condition_variable           _cnd;
    std::queue<std::function<void()>> _jobs;
};
//-------------------------------------------------------------

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
        , _f(_p.get_future())
    {
    }
    void run()
    {
        _d.post(std::bind(&synchronous_job::cb, this));
        _f.wait();
    }
private:
    void cb()
    {
        _job();
        _p.set_value();
    }
    std::function<void()> _job;
    dispatcher&           _d;
    std::promise<void>    _p;
    std::future<void>     _f;
};
//-------------------------------------------------------------

struct test
{
    test()
        : _count(0)
    {
    }
    void run()
    {
        synchronous_job job(std::bind(&test::cb, this), _d);
        job.run();
    }
private:
    void cb()
    {
        std::cout << ++_count << std::endl;
    }
    int _count;
    dispatcher _d;
};
//-------------------------------------------------------------

int main()
{
    test t;
    for (;;)
    {
        t.run();
    }
}

The stack trace of the deadlocked app:

Thread 1 (main thread)

#0  0x00007fa112ed750c in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007fa112a308ec in __gthread_cond_wait (__mutex=<optimized out>, __cond=<optimized out>) at /hostname/tmp/syddev/Build/gcc-4.6.2/gcc-build/x86_64-unknown-linux-gnu/libstdc++-v3/include/x86_64-unknown-linux-gnu/bits/gthr-default.h:846
#2  std::condition_variable::wait (this=<optimized out>, __lock=...) at ../../../../libstdc++-v3/src/condition_variable.cc:56
#3  0x00000000004291d9 in std::condition_variable::wait<std::__future_base::_State_base::wait()::{lambda()#1}>(std::unique_lock<std::mutex>&, std::__future_base::_State_base::wait()::{lambda()#1}) (this=0x78e050, __lock=..., __p=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/condition_variable:93
#4  0x00000000004281a8 in std::__future_base::_State_base::wait (this=0x78e018) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:331
#5  0x000000000042a2d6 in std::__basic_future<void>::wait (this=0x7fff0ae515c0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:576
#6  0x0000000000428dd8 in synchronous_job::run (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:60
#7  0x0000000000428f97 in test::run (this=0x7fff0ae51660) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:83
#8  0x0000000000427ad6 in main () at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:99

Thread 2 (dispatcher thread)

#0  0x00007fa112ed8b5b in pthread_once () from /lib64/libpthread.so.0
#1  0x0000000000427946 in __gthread_once (__once=0x78e084, __func=0x4272d0 <__once_proxy@plt>) at /hostname/sdk/gcc470/suse11/x86_64/bin/../lib/gcc/x86_64-unknown-linux-gnu/4.7.0/../../../../include/c++/4.7.0/x86_64-unknown-linux-gnu/bits/gthr-default.h:718
#2  0x000000000042948b in std::call_once<void (std::__future_base::_State_base::*)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >, std::reference_wrapper<bool> >(std::once_flag&, void (std::__future_base::_State_base::*&&)(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&), std::__future_base::_State_base* const&&, std::reference_wrapper<std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()> >&&, std::reference_wrapper<bool>&&) (__once=..., __f=
    @0x7fa111ff6be0: (void (std::__future_base::_State_base::*)(std::__future_base::_State_base * const, std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter>()> &, bool &)) 0x42848a <std::__future_base::_State_base::_M_do_set(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>&, bool&)>) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/mutex:819
#3  0x000000000042827d in std::__future_base::_State_base::_M_set_result(std::function<std::unique_ptr<std::__future_base::_Result_base, std::__future_base::_Result_base::_Deleter> ()>, bool) (this=0x78e018, __res=..., __ignore_failure=false) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:362
#4  0x00000000004288d5 in std::promise<void>::set_value (this=0x7fff0ae515a8) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/future:1206
#5  0x0000000000428e2a in synchronous_job::cb (this=0x7fff0ae51580) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:66
#6  0x000000000042df53 in std::_Mem_fn<void (synchronous_job::*)()>::operator() (this=0x78c6e0, __object=0x7fff0ae51580) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:554
#7  0x000000000042d77c in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::__call<void, , 0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) (this=0x78c6e0, __args=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1156
#8  0x000000000042cb28 in std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)>::operator()<, void>() (this=0x78c6e0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1215
#9  0x000000000042b772 in std::_Function_handler<void (), std::_Bind<std::_Mem_fn<void (synchronous_job::*)()> (synchronous_job*)> >::_M_invoke(std::_Any_data const&) (__functor=...) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:1926
#10 0x0000000000429f2c in std::function<void ()>::operator()() const (this=0x7fa111ff6da0) at /hostname/sdk/gcc470/suse11/x86_64/include/c++/4.7.0/functional:2311
#11 0x0000000000428c3c in dispatcher::loop (this=0x7fff0ae51668) at /home/lorimer/p4/Main/Source/Trading/Confucius/Test/Scratch/Test1/main.cpp:39

解决方案

std::promise is just like any other object: you can only access it from one thread at a time. In this case, you are calling set_value() and destroying the object from separate threads without sufficient synchronization: nowhere in the spec does it say that set_value will not touch the promise object after making the future ready.

However, since this future is used for a one-shot synchronization, you don't need to do that anyway: create the promise/future pair right in run(), and pass the promise to the thread:

struct synchronous_job
{
    synchronous_job(std::function<void()> job, dispatcher& d)
        : _job(job)
        , _d(d)
    {
    }
    void run(){
        std::promise<void> p;
        std::future<void> f=p.get_future();

        _d.post(
            [&]{
                cb(std::move(p));
            });

        f.wait();
    }
private:
    void cb(std::promise<void> p)
    {
        _job();
        p.set_value();
    }
    std::function<void()> _job;
    dispatcher&           _d;
};

这篇关于std :: promise和std :: future的不明显的生命周期问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆