具有原子索引的环形缓冲区 [英] Ring buffer with atomic indexes

查看:70
本文介绍了具有原子索引的环形缓冲区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在努力对原子在C ++中的工作方式有一个根本性的误解.我已经编写了以下代码,以使用原子变量作为索引来实现快速环形缓冲区,以便多个线程可以读写该缓冲区.我将代码缩减为这种简单的情况(我意识到仍然有些长.抱歉.).如果我在Linux或Mac OS X上运行此程序,它有时会起作用,但也至少有10%的时间会引发异常.它似乎也运行得非常快,然后放慢速度,甚至可能再次加快速度,这也暗示某些事情不太正确.我无法理解我逻辑上的缺陷.我在某处需要围栏吗?

I have struggled with what must be a fundamental misunderstanding of how atomics work in C++. I have written the code below to implement a fast ring buffer using atomic variables for indexes so multiple threads can write to and read from the buffer. I've whittled the code down to this simple case (which I realize is still a little long. Sorry.). If I run this on either Linux or Mac OS X, it will work some of the time, but it will also throw exceptions at least 10% of the time. It also seems to run very fast, and then slow down, and maybe even speed up again also suggesting something is not quite right. I cannot understand the flaw in my logic. Do I need a fence somewhere?

以下是其要执行的操作的简单说明:使用compare_exchange_weak方法可以提高原子索引变量的质量.这是为了确保可以独占访问从其索引移出的插槽.实际上需要两个索引,因此当我们环绕环形缓冲区时,值不会被覆盖.注释中嵌入了更多详细信息.

Here's a simple description of what it's trying to do: Atomic index variables are bumped up using the compare_exchange_weak method. This is to guarantee exclusive access to the slot the index was bumped from. Two indices are actually needed so as we wrap around the ring buffer, values are not overwritten. More details are embedded in the comments.

#include <mutex>
#include <atomic>
#include <iostream>
#include <cstdint>
#include <vector>
#include <thread>
using namespace std;


const uint64_t Nevents = 1000000;
std::atomic<uint64_t> Nwritten(0);
std::atomic<uint64_t> Nread(0);
#define MAX_EVENTS 10

mutex MTX;

std::atomic<uint32_t> iread{0};  // The slot that the next thread will try to read from
std::atomic<uint32_t> iwrite{0}; // The slot that the next thread will try to write to
std::atomic<uint32_t> ibegin{0}; // The slot indicating the beginning of the read region
std::atomic<uint32_t> iend{0};   // The slot indicating one-past-the-end of the read region
std::atomic<uint64_t> EVENT_QUEUE[MAX_EVENTS];

//-------------------------------
// WriteThreadATOMIC
//-------------------------------
void WriteThreadATOMIC(void)
{
    MTX.lock();
    MTX.unlock();

    while( Nwritten < Nevents ){

        // Copy (atomic) iwrite index to local variable and calculate index
        // of next slot after it
        uint32_t idx = iwrite;
        uint32_t inext = (idx + 1) % MAX_EVENTS;
        if(inext == ibegin){
            // Queue is full
            continue;
        }

        // At this point it looks like slot "idx" is available to write to.
        // The next call ensures only one thread actually does write to it
        // since the compare_exchange_weak will succeed for only one.
        if(iwrite.compare_exchange_weak(idx, inext))
        {
            // OK, we've claimed exclusive access to the slot. We've also
            // bumped the iwrite index so another writer thread can try
            // writing to the next slot. Now we write to the slot.
            if(EVENT_QUEUE[idx] != 0) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -1;} // Dummy check. This should NEVER happen!
            EVENT_QUEUE[idx] = 1;
            Nwritten++;

            if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -3;} // Dummy check. This should NEVER happen!

            // The idx slot now contains valid data so bump the iend index to
            // let reader threads know. Note: if multiple writer threads are
            // in play, this may spin waiting for another to bump iend to us
            // before we can bump it to the next slot.
            uint32_t save_idx = idx;
            while(!iend.compare_exchange_weak(idx, inext)) idx = save_idx;
        }
    }
    lock_guard<mutex> lck(MTX);
    cout << "WriteThreadATOMIC done" << endl;
}

//-------------------------------
// ReadThreadATOMIC
//-------------------------------
void ReadThreadATOMIC(void)
{
    MTX.lock();
    MTX.unlock();

    while( Nread < Nevents ){

        uint32_t idx = iread;
        if(idx == iend) {
            // Queue is empty
            continue;
        }
        uint32_t inext = (idx + 1) % MAX_EVENTS;

        // At this point it looks like slot "idx" is available to read from.
        // The next call ensures only one thread actually does read from it
        // since the compare_exchange_weak will succeed for only one.
        if( iread.compare_exchange_weak(idx, inext) )
        {
            // Similar to above, we now have exclusive access to this slot
            // for reading.
            if(EVENT_QUEUE[idx] != 1) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -2;} // Dummy check. This should NEVER happen!
            EVENT_QUEUE[idx] = 0;
            Nread++;

            if(Nread>Nwritten) {lock_guard<mutex> lck(MTX); cerr<<__FILE__<<":"<<__LINE__<<endl; throw -4;} // Dummy check. This should NEVER happen!

            // Bump ibegin freeing idx up for writing
            uint32_t save_idx = idx;
            while(!ibegin.compare_exchange_weak(idx, inext)) idx = save_idx;
        }
    }
    lock_guard<mutex> lck(MTX);
    cout << "ReadThreadATOMIC done" << endl;
}

//-------------------------------
// main
//-------------------------------
int main(int narg, char *argv[])
{
    int Nwrite_threads = 4;
    int Nread_threads = 4;

    for(int i=0; i<MAX_EVENTS; i++) EVENT_QUEUE[i] = 0;

    MTX.lock(); // Hold off threads until all are created

    // Launch writer and reader threads
    vector<std::thread *> atomic_threads;
    for(int i=0; i<Nwrite_threads; i++){
        atomic_threads.push_back( new std::thread(WriteThreadATOMIC) );
    }
    for(int i=0; i<Nread_threads; i++){
        atomic_threads.push_back( new std::thread(ReadThreadATOMIC) );
    }

    // Release all threads and wait for them to finish
    MTX.unlock();
    while( Nread < Nevents) {
        std::this_thread::sleep_for(std::chrono::microseconds(1000000));
        cout << "Nwritten: " << Nwritten << "  Nread: " << Nread << endl;
    }

    // Join threads
    for(auto t : atomic_threads) t->join();
}

当我在调试器中发现此错误时,通常是由于EVENT_QUEUE插槽中的值错误.有时,尽管Nread的数量超过了Nwrite的数量,但这似乎是不可能的.我认为我不需要栅栏,因为一切都是原子的,但是由于我不得不质疑我认为的一切,我现在不能说.

When I have caught this in a debugger, it is usually due to the wrong value in the EVENT_QUEUE slot. Sometimes though the Nread count exceeds the Nwritten which seems like it should be impossible. I don't think I need a fence since everything is an atomic, but I can't say at this point since I have to question everything I think I know.

任何建议或见识将不胜感激.

Any suggestion or insight would be appreciated.

推荐答案

我之前已经建立了这种确切的结构,您的实现几乎是我曾经遇到过的问题.问题归结为以下事实:环形缓冲区由于不断重复使用相同的内存,因此特别容易受到ABA问题的影响.

I have built this exact structure before, your implementation is pretty much what I had at one point which also had issues. The issues come down to the fact that ring-buffers, since they continually reuse the same memory, are particularly susceptible to ABA problems.

如果您不知道,则 ABA问题是您获取值的地方 A ,您稍后检查该值是否仍为 A ,以确保您仍然处于良好状态,但是您不知道,该值实际上是从 A B ,然后返回到 A .

If you aren't aware, an ABA problem is where you acquire a value A, you later check that the value is still A to ensure you're still in a good state, but unbeknownst to you, the value actually changed from A to B and then back to A.

我会指出您的作者中的一个情况,但读者有同样的问题:

I'll point out a scenario in your writer, but the reader has the same problem:

// Here you check if you can even do the write, lets say it succeeds.
uint32_t idx = iwrite;
uint32_t inext = (idx + 1) % MAX_EVENTS;
if(inext == ibegin)
    continue;

// Here you do a compare exchange to ensure that nothing has changed
// out from under you, but lets say your thread gets unscheduled, giving
// time for plenty of other reads and writes occur, enough writes that
// your buffer wraps around such that iwrite is back to where it was at.
// The compare exchange can succeed, but your condition above may not
// still be good anymore!
if(iwrite.compare_exchange_weak(idx, inext))
{
    ...

我不知道是否有更好的方法来解决此问题,但是我认为在交换之后添加额外的支票仍然有问题.我最终通过添加其他原子来跟踪写保留计数和读保留计数,从而解决了该问题,以便即使缠绕了它们,我也可以保证该空间仍然可以使用.可能还有其他解决方案.

I don't know if there's a better way to solve this problem, but I think adding an extra check after the exchange still had problems. I ultimately solved the problem by adding additional atomics that kept track of write-reserved and read-reserved counts so that even if it wrapped around, I could guarantee that the space was still ok to work on. There may be other solutions.

免责声明:这可能不是您唯一的问题.

这篇关于具有原子索引的环形缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆