在C ++ 11中等待多个条件变量的最好方法是什么? [英] What is the best way to wait on multiple condition variables in C++11?

查看:1489
本文介绍了在C ++ 11中等待多个条件变量的最好方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

第一个上下文:我正在学习C ++ 11中的线程,为此,我试图构建一个小 actor 类,基本上(我离开异常处理和传播的东西出来)如下:

First a little context: I'm in the process of learning about threading in C++11 and for this purpose, I'm trying to build a small actor class, essentially (I left the exception handling and propagation stuff out) like so:

class actor {
    private: std::atomic<bool> stop;
    private: std::condition_variable interrupt;
    private: std::thread actor_thread;
    private: message_queue incoming_msgs;

    public: actor() 
    : stop(false), 
      actor_thread([&]{ run_actor(); })
    {}

    public: virtual ~actor() {
        // if the actor is destroyed, we must ensure the thread dies too
        stop = true;
        // to this end, we have to interrupt the actor thread which is most probably
        // waiting on the incoming_msgs queue:
        interrupt.notify_all();
        actor_thread.join();
    }

    private: virtual void run_actor() {
        try {
            while(!stop)
                // wait for new message and process it
                // but interrupt the waiting process if interrupt is signaled:
                process(incoming_msgs.wait_and_pop(interrupt));
        } 
        catch(interrupted_exception) {
            // ...
        }
    };

    private: virtual void process(const message&) = 0;
    // ...
};

每个actor都运行在自己的 actor_thread 等待在 incoming_msgs 上的新传入消息,以及 - 当消息到达时,处理它。

Every actor runs in its own actor_thread, waits on a new incoming message on incoming_msgs and -- when a message arrives -- processes it.

actor_thread actor 必须与它一起死,这就是为什么我需要某种中断机制在 message_queue :: wait_and_pop(std :: condition_variable中断)

The actor_thread is created together with the actor and has to die together with it, which is why I need some kind of interrupt mechanism in the message_queue::wait_and_pop(std::condition_variable interrupt).

基本上,我需要 wait_and_pop 阻塞,直到
a)a new 消息到达或
b),直到中断被触发,在这种情况下 - 理想地 - interrupted_exception

Essentially, I require that wait_and_pop blocks until either a) a new message arrives or b) until the interrupt is fired, in which case -- ideally -- an interrupted_exception is to be thrown.

新消息到达 message_queue 目前还由 std :: condition_variable new_msg_notification

// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);

    // How to interrupt the following, when interrupt fires??
    new_msg_notification.wait(lock,[&]{
        return !queue.empty();
    });
    auto msg(std::move(queue.front()));
    queue.pop();
    return msg;
}

为了简化故事,问题是这样的:当中断 new_msg_notification.wait(...) >

To cut the long story short, the question is this: How do I interrupt the waiting for a new message in new_msg_notification.wait(...) when the interrupt is triggered (without introducing a time-out)?

或者,问题可以解读为:如何等待两个 std :: condition_variable s信号?

Alternatively, the question may be read as: How do I wait until any one of two std::condition_variables are signaled?

一个朴素的方法似乎不是使用 std :: condition_variable 原子标志 std :: atomic< bool>中断然后忙于等待 new_msg_notification ,只有一个非常小的超时,直到一个新消息到达或直到 true == interrupted

One naive approach seems to be not to use std::condition_variable at all for the interrupt and instead just use an atomic flag std::atomic<bool> interrupted and then busy wait on new_msg_notification with a very small time-out until either a new message has arrived or until true==interrupted. However, I would very much like to avoid busy waiting.

从pilcrow的评论和回答看,基本上有两种方法可能。

From the comments and the answer by pilcrow, it looks like there are basically two approaches possible.


  1. 终止消息,由Alan,mukunda和pilcrow提出。我决定反对这个选项,因为我不知道在我想要actor终止的队列的大小。它可能是非常好的(因为它是大多数情况下,当我想要的东西快速终止),有成千上万的消息留在队列中处理,似乎不可接受等待他们被处理,直到终止消息得到它

  2. 实现条件变量的自定义版本,可能会被另一个线程中断,将通知转发到第一个线程正在等待的条件变量。我选择了这种方法。

对于你感兴趣的人,我的实现如下。我的情况下的条件变量实际上是一个信号量(因为我更喜欢他们,因为我喜欢这样做的练习)。我为这个信号量配备了一个相关的中断,它可以通过 semaphore :: get_interrupt()从信号量获取。如果现在一个线程阻塞在 semaphore :: wait(),另一个线程有可能调用 semaphore :: interrupt :: trigger() code>在信号量的中断,导致第一个线程解除阻塞并传播 interrupt_exception

For those of you interested, my implementation goes as follows. The condition variable in my case is actually a semaphore (because I like them more and because I liked the exercise of doing so). I equipped this semaphore with an associated interrupt which can be obtained from the semaphore via semaphore::get_interrupt(). If now one thread blocks in semaphore::wait(), another thread has the possibility to call semaphore::interrupt::trigger() on the interrupt of the semaphore, causing the first thread to unblock and propagate an interrupt_exception.

struct
interrupt_exception {};

class
semaphore {
    public: class interrupt;
    private: mutable std::mutex mutex;

    // must be declared after our mutex due to construction order!
    private: interrupt* informed_by;
    private: std::atomic<long> counter;
    private: std::condition_variable cond;

    public: 
    semaphore();

    public: 
    ~semaphore() throw();

    public: void 
    wait();

    public: interrupt&
    get_interrupt() const { return *informed_by; }

    public: void
    post() {
        std::lock_guard<std::mutex> lock(mutex);
        counter++;
        cond.notify_one(); // never throws
    }

    public: unsigned long
    load () const {
        return counter.load();
    }
};

class
semaphore::interrupt {
    private: semaphore *forward_posts_to;
    private: std::atomic<bool> triggered;

    public:
    interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
        assert(forward_posts_to);
        std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
        forward_posts_to->informed_by = this;
    }

    public: void
    trigger() {
        assert(forward_posts_to);
        std::lock_guard<std::mutex>(forward_posts_to->mutex);

        triggered = true;
        forward_posts_to->cond.notify_one(); // never throws
    }

    public: bool
    is_triggered () const throw() {
        return triggered.load();
    }

    public: void
    reset () throw() {
        return triggered.store(false);
    }
};

semaphore::semaphore()  : counter(0L), informed_by(new interrupt(this)) {}

// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw()  {
    delete informed_by;
}

void
semaphore::wait() {
    std::unique_lock<std::mutex> lock(mutex);
    if(0L==counter) {
        cond.wait(lock,[&]{
            if(informed_by->is_triggered())
                throw interrupt_exception();
            return counter>0;
        });
    }
    counter--;
}

使用 semaphore ,我的消息队列实现现在看起来像这样(使用信号量而不是 std :: condition_variable 我可以摆脱 std :: mutex

Using this semaphore, my message queue implementation now looks like this (using the semaphore instead of the std::condition_variable I could get rid of the std::mutex:

class
message_queue {    
    private: std::queue<message> queue;
    private: semaphore new_msg_notification;

    public: void
    push(message&& msg) {
        queue.push(std::move(msg));
        new_msg_notification.post();
    }

    public: const message
    wait_and_pop() {
        new_msg_notification.wait();
        auto msg(std::move(queue.front()));
        queue.pop();
        return msg;
    }

    public: semaphore::interrupt&
    get_interrupt() const { return new_msg_notification.get_interrupt(); }
};

我的 actor 现在能够在其线程中以非常低的延迟中断其线程。目前的实现如下:

My actor, is now able to interrupt its thread with very low latency in its thread. The implementation presently like this:

class
actor {
    private: message_queue
    incoming_msgs;

    /// must be declared after incoming_msgs due to construction order!
    private: semaphore::interrupt&
    interrupt;

    private: std::thread
    my_thread;

    private: std::exception_ptr
    exception;

    public:
    actor()
    : interrupt(incoming_msgs.get_interrupt()), my_thread(
        [&]{
            try {
                run_actor();
            }
            catch(...) {
                exception = std::current_exception();
            }
        })
    {}

    private: virtual void
    run_actor() {
        while(!interrupt.is_triggered())
            process(incoming_msgs.wait_and_pop());
    };

    private: virtual void
    process(const message&) = 0;

    public: void
    notify(message&& msg_in) {
        incoming_msgs.push(std::forward<message>(msg_in));
    }

    public: virtual
    ~actor() throw (interrupt_exception) {
        interrupt.trigger();
        my_thread.join();
        if(exception)
            std::rethrow_exception(exception);
    }
};

我还在 http://goo.gl/2GDAa1

推荐答案

您问:


在C ++ 11中等待多个条件变量的最佳方法是什么?

您不能,必须重新设计。一个线程可以一次只等待一个条件变量(及其关联的互斥体)。在这方面,用于同步的Windows设施比POSIX样式同步原语族更加丰富。

You can't, and must redesign. One thread may wait on only one condition variable (and its associated mutex) at a time. In this regard the Windows facilities for synchronization are rather richer than those of the "POSIX-style" family of synchronization primitives.

线程安全队列的典型方法是入列一个特别的全部完成!消息,或设计可断开(或可关闭)队列。在后一种情况下,队列的内部条件变量保护一个复杂的谓词:一个项目可用队列已被破坏。

The typical approach with thread-safe queues is to enqueue a special "all done!" message, or to design a "breakable" (or "shutdown-able") queue. In the latter case, the queue's internal condition variable then protects a complex predicate: either an item is available or the queue has been broken.

在评论中,您发现


如果没有人在等待,notify_all / p>

a notify_all() will have no effect if there is no one waiting

这是真的,但可能不相关。 wait()对条件变量也意味着检查一个谓词,并且在实际阻止通知之前检查它。因此,工作线程忙于处理一个未命中一个 notify_all()的队列项,将在下次检查队列条件时看到谓词可用,或者,队列已完成)已更改。

That's true but probably not relevant. wait()ing on a condition variable also implies checking a predicate, and checking it before actually blocking for a notification. So, a worker thread busy processing a queue item that "misses" a notify_all() will see, the next time it inspects the queue condition, that the predicate (a new item is available, or, the queue is all done) has changed.

这篇关于在C ++ 11中等待多个条件变量的最好方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆