向无锁队列添加阻塞函数 [英] Adding blocking functions to lock-free queue

查看:174
本文介绍了向无锁队列添加阻塞函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基于循环缓冲区的无锁多生成器,单消费队列。到目前为止,它只有非阻塞 push_back() pop_front()调用。现在我想添加这些调用的阻塞版本,但我想最小化这对使用非阻塞版本的代码的性能的影响 - 即,它不应该将它们转换为默认锁定。



阻塞push_back()的最简单版本应该是这样:

  void push_back_Blocking(const T& pkg){
if(!push_back(pkg)){
unique_lock< mutex> ul(mux);
while(!push_back(pkg)){
cv_notFull.wait(ul);
}
}
}

将以下块放在非阻塞的结尾 pop_front()

  {
std :: lock_guard< mutex> lg(mux);
cv_notFull.notify_all();
}

notify 单独几乎没有任何性能影响(如果没有线程正在等待),锁已经。因此我的问题是:

我如何(如果可能的话,使用标准c ++ 14)在我的队列中添加阻塞 push_back pop_front 成员函数,不会严重妨碍非阻塞对等体的性能






如果没有线程实际上被阻塞,引用,我当前的版本看起来类似于这个(我省略了调试检查,数据对齐和显式内存排序):

 类T,size_t N> 
class MPSC_queue {
使用INDEX_TYPE = unsigned long;
struct Idx {
INDEX_TYPE idx;
INDEX_TYPE version_cnt;
};
枚举类SlotState {
EMPTY,
FILLED
};
struct Slot {
Slot()= default;
std :: atomic< SlotState> state = SlotState :: EMPTY;
T data {};
};
struct Buffer_t {
std :: array< Slot,N>数据{};
Buffer_t(){
data.fill(Slot {SlotState :: EMPTY,T {}});
}
Slot& operator [](Idx idx){
return this-> operator [](idx.idx);
}
Slot& operator [](INDEX_TYPE idx){
return data [idx];
}
};

Buffer_t buffer;
std :: atomic< Idx>头{};
std :: atomic< INDEX_TYPE> tail = 0;

INDEX_TYPE next(INDEX_TYPE old){return(old + 1)%N; }

Idx next(Idx old){
old.idx = next(old.idx);
old.version_cnt ++;
return old;
}
public:
bool push_back(const T& val){
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if(wrtIdx.idx == tail){
return false;
}
} while(!head.compare_exchange_strong(tHead,wrtIdx));

buffer [wrtIdx] .data = val;
buffer [wrtIdx] .state = SlotState :: FILLED;
return true;
}

bool pop_front(T& val){
auto rIdx = next(tail);
if(buffer [rIdx] .state!= SlotState :: FILLED){
return false;
}
val = buffer [rIdx] .data;
buffer [rIdx] .state = SlotState :: EMPTY;
tail = rIdx;
return true;
}
};






相关问题:



我特别问过一个类似的问题,关于优化 condition_variable :: notify 在这里,但问题已关闭,因为一个假设的重复此问题

我不同意,因为这个问题是为什么mutex一般需要用于条件变量(或者它的pthread等价) - 关注 condition_variable :: wait - 而不是如果/如何可以避免的通知部分。但显然我没有这么清楚(或人们只是不同意我的意见)。



在任何情况下,链接问题中的答案没有帮助我,因为这是一个 XY问题,我决定问另一个问题,我有实际问题,从而允许更广泛的可能的解决方案(也许有一种避免条件变量的方法)。



此问题也非常类似,但


  1. 它是关于C在linux和答案使用平台特定的
    结构(pthreads和futexes)

  2. 作者要求有效阻止调用,但没有非阻塞调用。我另一方面不太在乎阻塞的效率,但希望保持非阻塞的速度尽可能快。


如果在条件变量上有潜在服务员,您可以锁定的互斥体 notify_all 调用。



事情是执行条件检查!push_back(pkg)之前等待条件变量(C ++ 11没有提供其他方法)。所以互斥是唯一能够保证这些动作之间的一致性的意思。



但是,当没有潜在服务员参与时,可以省略锁定(和通知)。只需使用附加标志:

  class MPSC_queue {
... //原始定义
std :: atomic< bool> has_waiters;

public:
void push_back_Blocking(const T& pkg){
if(!push_back(pkg)){
unique_lock< mutex> ul(mux);
has_waiters.store(true,std :: memory_order_relaxed); //#1
while(!push_back(pkg)){//#2在push_back()方法内
cv_notFull.wait(ul);
//其他服务员在我们等待的时候可以清除标志。再次设置。与#1相同。
has_waiters.store(true,std :: memory_order_relaxed);
}
has_waiters.store(false,std :: memory_order_relaxed);
}
}

//方法与原始方法相同,仅为#2标记公开。
bool push_back(const T& val){
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if(wrtIdx.idx == tail){//#2
return false;
}
} while(!head.compare_exchange_strong(tHead,wrtIdx));

buffer [wrtIdx] .data = val;
buffer [wrtIdx] .state = SlotState :: FILLED;
return true;
}

bool pop_front(T& val){
//主要工作,与原始pop_front相同,仅为#3标记公开。
auto rIdx = next(tail);
if(buffer [rIdx] .state!= SlotState :: FILLED){
return false;
}
val = buffer [rIdx] .data;
buffer [rIdx] .state = SlotState :: EMPTY;
tail = rIdx; //#3

//通知部分
if(has_waiters.load(std :: memory_order_relaxed))//#4
{
//服务员。需要锁定。
std :: lock_guard< mutex> lg(mux);
cv_notFull.notify_all();
}

return true;
}
};

这里的关键关系是:


  1. #1 设置标志并读取 #2 。 $ c>并检查#4 的标志。

某种类型的通用顺序。即使是#2 ,也应该在其他线程之前观察#1 。与#3 #4 相同。



那么可以保护如果检查标志#4 发现它未设置,则可能进行条件检查#2 会发现条件变化的影响#3



在当前实现通用顺序之间隐含地加载 tail 提供$ c>#1 #2 > memory_order_seq_cst #3 #4 之间的相同顺序通过存储 tail 通用顺序 是最棘手的部分。在这两种关系中,都是 Write After Write 命令,其不能通过 memory_order_acquire memory_order_release 的任何组合来实现。因此,应使用 memory_order_seq_cst


I have a lock-free multi producer, single consumer queue, based on a circular buffer. So far, it only has non-blocking push_back() and pop_front() calls. Now I want to add blocking versions of those calls, but I want to minimize the impact this has on the performance of code that uses the non-blocking versions - namely, it should not turn them into "lock-by-default" calls.

E.g. the simplest version of a blocking push_back() would look like this:

void push_back_Blocking(const T& pkg) {
    if (!push_back(pkg)) {
        unique_lock<mutex> ul(mux);
        while (!push_back(pkg)) {
            cv_notFull.wait(ul);
        }
    }
}

but unfortunately this would also require to put the following block at the end of the "non-blocking" pop_front():

{
    std::lock_guard<mutex> lg(mux);
    cv_notFull.notify_all();
}

While the notify alone has hardly any performance impact (if no thread is waiting), the lock has.

So my question is:
How can I (using standard c++14 if possible) add blocking push_back and pop_front member functions to my queue without severely impeding the performance of the non_blocking counterparts (read: minimize system calls)? At least as long as no thread is actually blocked - but ideally even then.


For reference, my current version looks similar to this (I left out debug checks, data alignment and explicit memory orderings):

template<class T, size_t N>
class MPSC_queue {
    using INDEX_TYPE = unsigned long;
    struct Idx {
        INDEX_TYPE idx;
        INDEX_TYPE version_cnt;
    };
    enum class SlotState {
        EMPTY,
        FILLED
    };
    struct Slot {
        Slot() = default;               
        std::atomic<SlotState> state= SlotState::EMPTY;
        T data{};
    };
    struct Buffer_t {
        std::array<Slot, N> data{}; 
        Buffer_t() {
            data.fill(Slot{ SlotState::EMPTY, T{} });
        }
        Slot& operator[](Idx idx) {
            return this->operator[](idx.idx);
        }
        Slot& operator[](INDEX_TYPE idx) {
            return data[idx];                   
        }
    };

    Buffer_t buffer;
    std::atomic<Idx> head{};
    std::atomic<INDEX_TYPE> tail=0;

    INDEX_TYPE next(INDEX_TYPE old) { return (old + 1) % N; }

    Idx next(Idx old) {
        old.idx = next(old.idx);
        old.version_cnt++;
        return old;
    }
public:     
    bool push_back(const T& val) {
        auto tHead = head.load();
        Idx wrtIdx;
        do {
            wrtIdx = next(tHead);
            if (wrtIdx.idx == tail) {
                return false;
            }
        } while (!head.compare_exchange_strong(tHead, wrtIdx));

        buffer[wrtIdx].data = val;
        buffer[wrtIdx].state = SlotState::FILLED;
        return true;
    }

    bool pop_front(T& val) {                
        auto rIdx = next(tail);
        if (buffer[rIdx].state != SlotState::FILLED) {
            return false;
        }
        val = buffer[rIdx].data;
        buffer[rIdx].state = SlotState::EMPTY;
        tail = rIdx;
        return true;
    }
};


Related questions:

I asked a similar question specificly about optimizing the usage of condition_variable::notify here, but the question got closed as a supposedly duplicate of this question.
I disagree, because that question was about why the mutex is needed for condition variables in general (or rather it's pthread equivalent) - focusing on condition_variable::wait - and not if/how it can be avoided for the notify part. But apparently I didn't make that sufficiently clear (or people just disagreed with my opinion).

In any case, the answers in the linked question did not help me and as this was somewhat of an XY-problem anyway, I decided to ask another question about the actual problem I have and thus allow a wider range of possible solutions (maybe there is a way to avoid condition variables altogether).

This question is also very similar, but

  1. It is about C on linux and the answers use platform specific constructs (pthreads and futexes)
  2. The author there asked for efficent blocking calls, but no non-blocking ones at all. I on the other hand don't care too much about the efficiency of the blocking ones but want to keep the non-blocking ones as fast as possible.

解决方案

If there is potential waiter on condition variable, you have to lock mutex for notify_all call.

The thing is that condition check (!push_back(pkg)) is performed before wait on condition variable (C++11 provides no other way). So mutex is the only mean which can garantee constistency between these actions.

But it is possible to omit locking (and notification) in case when no potential waiter is involved. Just use additinal flag:

class MPSC_queue {
    ... // Original definitions
    std::atomic<bool> has_waiters;

public:
    void push_back_Blocking(const T& pkg) {
        if (!push_back(pkg)) {
            unique_lock<mutex> ul(mux);
            has_waiters.store(true, std::memory_order_relaxed); // #1
            while (!push_back(pkg)) { // #2 inside push_back() method
                cv_notFull.wait(ul);
                // Other waiter may clean flag while we wait. Set it again. Same as #1.
                has_waiters.store(true, std::memory_order_relaxed);
            }
            has_waiters.store(false, std::memory_order_relaxed);
        }
    }

    // Method is same as original, exposed only for #2 mark.
    bool push_back(const T& val) {
        auto tHead = head.load();
        Idx wrtIdx;
        do {
            wrtIdx = next(tHead);
            if (wrtIdx.idx == tail) { // #2
                return false;
            }
        } while (!head.compare_exchange_strong(tHead, wrtIdx));

        buffer[wrtIdx].data = val;
        buffer[wrtIdx].state = SlotState::FILLED;
        return true;
    }

    bool pop_front(T& val) {
        // Main work, same as original pop_front, exposed only for #3 mark.
        auto rIdx = next(tail);
        if (buffer[rIdx].state != SlotState::FILLED) {
            return false;
        }
        val = buffer[rIdx].data;
        buffer[rIdx].state = SlotState::EMPTY;
        tail = rIdx; // #3

        // Notification part
        if(has_waiters.load(std::memory_order_relaxed)) // #4
        {
            // There are potential waiters. Need to lock.
            std::lock_guard<mutex> lg(mux);
            cv_notFull.notify_all();
        }

        return true;
    }
};

Key relations here are:

  1. Setting flag at #1 and reading tail for check condition at #2.
  2. Storing tail at #3 and checking flag at #4.

Both these relations should expose some sort of universal order. That is #1 should be observered before #2 even by other thread. Same for #3 and #4.

In that case one can garantee that, if checking flag #4 found it not set, then possible futher condition check #2 will found effect of condition change #3. So it is safe to not lock (and notify), because no waiter is possible.

In your current implementation universal order between #1 and #2 is provided by loading tail with implicit memory_order_seq_cst. Same order between #3 and #4 is provided by storing tail with implicit memory_order_seq_cst.

In that approach, "Do not lock if no waiters", universal order is the most tricky part. In both relations, it is Read After Write order, which cannot be achieved with any combination of memory_order_acquire and memory_order_release. So memory_order_seq_cst should be used.

这篇关于向无锁队列添加阻塞函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆