多线程队列原子操作 [英] Multithread queue atomic operations

查看:156
本文介绍了多线程队列原子操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用std :: atomic结构,并编写了这个无锁多生产者多消费者队列,我在这里附加。队列的想法基于两个堆栈 - 生产者和消费者堆栈,它们本质上是链表结构。列表的节点将索引保存在数组中,该数组保存实际数据,您可以在其中读取或写入。



这个想法是列表的节点是互斥的,即,指向节点的指针可以仅存在于生产者或消费者列表中。生产者将尝试从生产者列表获取节点,从消费者列表获取消费者,并且每当生产者或消费者获取到节点的指针时,它应该在两个列表之外,以便没有人能够获取它。我使用std :: atomic_compare_exchange函数来旋转,直到一个节点被弹出。



问题是,必须有一些错误的逻辑或操作不是原子因为我假设他们是因为即使有1个生产者和1个消费者,给定足够的时间,队列将活锁,我注意到,如果你断言cell!= cell-> m_next,断言将得到命中!所以它可能是在面对我的眼睛,我只是看不到它,所以我想知道是否有人可以推销。



Thx

  #ifndef MTQueue_h 
#define MTQueue_h

#include< atomic>

template< typename Data,uint64_t queueSize>
class MTQueue
{
public:

MTQueue():m_produceHead(0),m_consumeHead(0)
{
for i = 0; i {
m_nodes [i] .m_idx = i;
m_nodes [i] .m_next =& m_nodes [i + 1];
}
m_nodes [queueSize-1] .m_idx = queueSize - 1;
m_nodes [queueSize-1] .m_next = NULL;

m_produceHead = m_nodes;
m_consumeHead = NULL;
}

struct CellNode
{
uint64_t m_idx;
CellNode * m_next;
};

bool push(const Data& data)
{
if(m_produceHead == NULL)
return false;

//弹出生产者列表。
CellNode * cell = m_produceHead;
while(!std :: atomic_compare_exchange_strong(& m_produceHead,
& cell,cell-> m_next))
{
cell = m_produceHead;
if(!cell)
return false;
}

//此时单元格应该指向不在任何列表中的节点
m_data [cell-> m_idx] = data;

//将该节点作为消费者列表的新头
cell-> m_next = m_consumeHead;
while(!std :: atomic_compare_exchange_strong(& m_consumeHead,
& cell-> m_next,cell))
{
cell-> m_next = m_consumeHead;
}
return true;
}

bool pop(Data& data)
{
if(m_consumeHead == NULL)
return false;

//弹出消费者列表
CellNode * cell = m_consumeHead;
while(!std :: atomic_compare_exchange_strong(& m_consumeHead,
& cell,cell-> m_next))
{
cell = m_consumeHead;
if(!cell)
return false;
}

//此时单元格应该指向不在任何列表中的节点
data = m_data [cell-> m_idx];

//将该节点作为生产者列表的新头
cell-> m_next = m_produceHead;
while(!std :: atomic_compare_exchange_strong(& m_produceHead,
& cell-> m_next,cell))
{
cell-> m_next = m_produceHead;
}
return true;
};

private:

数据m_data [queueSize];

//两个列表的节点
CellNode m_nodes [queueSize];

volatile std :: atomic< CellNode *> m_produceHead;
volatile std :: atomic< CellNode *> m_consumeHead;
};

#endif


解决方案

I请看您的队列实现的一些问题:


  1. 这不是一个队列,它是一个堆栈:第一项弹出。不是堆栈有什么问题,但令人困惑的是将其称为队列。事实上,它是两个无锁堆栈:一个堆栈最初填充有节点数组,另一个堆栈存储实际数据元素,使用第一个堆栈作为自由节点列表。


  2. push 中, CellNode :: m_next code> pop (毫不奇怪,因为他们都做同样的事情,即从一个堆栈弹出一个节点并将该节点推到另一个堆栈)。说两个线程同时进入例如 pop ,并从 m_consumeHead 读取相同的值。线程1向前成功弹出并设置数据。然后线程1将 m_produceHead 的值写入 cell-> m_next ,而线程2同时读取 cell-> m_next 以传递到 std :: atomic_compare_exchange_strong_explicit 。通过两个线程同时非原子读取和写入 cell-> m_next 是一个数据竞争。



    这是在并发文献中被称为良性竞争:一个失效/无效值被读取,但从未被使用。如果你确信你的代码永远不需要运行在可能导致火爆的架构上,你可以忽略它,但是为了严格符合标准内存模型,你需要使 m_next 一个原子,并使用至少 memory_order_relaxed 读取以消除数据竞争。


  3. 比较交换循环的正确性是基于这样的前提:原子指针(例如, m_produceHead m_consumeHead )在初始加载和后面的比较交换时具有相同的值意味着pointee对象也必须也是不变的。该前提不适用于任何设计,其中可以比一些线程通过其比较交换循环跳闸更快地再循环对象。考虑这个事件序列:


    1. 线程1输入 pop m_consumeHead m_consumeHead-> m_next ,但在调用比较交换之前封锁。

    2. 线程2成功地从 m_consumeHead 中弹出该节点并阻止。

    3. 线程3将多个节点推入 m_consumeHead

    4. 线程2解除阻塞并将原节点推送到 m_produceHead
    5. 线程3从 m_produceHead 中弹出该节点,并将其推回到 m_consumeHead li>
    6. 线程1最后解除阻塞并调用compare-exchange函数,因为 m_consumeHead 的值相同,所以成功。它弹出的节点 - 这是好,好 - 但设置 m_consumeHead 到它回读的stale m_next


    I'm playing with the std::atomic structures and wrote this lock-free multi-producer multi-consumer queue, which I'm attaching here. The idea for the queue is based on two stacks - a producer and consumer stack, which are essentially linked list structures. The nodes of the lists hold the indexes into an array of that holds the actual data, where you would read or write.

    The idea would be that the nodes for the lists are mutually exclusive, ie, a pointer to a node can exist only in the producer or the consumer list. A producer would attempt to acquire a node from the producer list, a consumer from the consumer list and whenever a pointer to a node is acquired by either producer or consumer, it should be out of both lists so that noone else could acquire it. I'm using std::atomic_compare_exchange functions to spin until a node is popped.

    The problem is that there must be something wrong with the logic or the operations are not atomic as I assume them to be because even with 1 producer and 1 consumer, given enough time, the queue will livelock and what I have noticed is that if you assert that cell != cell->m_next, the assert would get hit ! So its probably something staring me in the face and I just don't see it, so I wonder if someone could pitch in.

    Thx

    #ifndef MTQueue_h
    #define MTQueue_h
    
    #include <atomic>
    
    template<typename Data, uint64_t queueSize>
    class MTQueue
    {
    public:
    
        MTQueue() : m_produceHead(0), m_consumeHead(0)
        {
            for(int i=0; i<queueSize-1; ++i)
            {
                m_nodes[i].m_idx = i;
                m_nodes[i].m_next = &m_nodes[i+1];
            }
            m_nodes[queueSize-1].m_idx = queueSize - 1;
            m_nodes[queueSize-1].m_next = NULL;
    
            m_produceHead = m_nodes;
            m_consumeHead = NULL;
        }
    
        struct CellNode
        {
            uint64_t m_idx;
            CellNode* m_next;
        };
    
        bool push(const Data& data)
        {
            if(m_produceHead == NULL)
                return false;
    
            // Pop the producer list.
            CellNode* cell = m_produceHead;
            while(!std::atomic_compare_exchange_strong(&m_produceHead,
                                                       &cell, cell->m_next))
            {
                cell = m_produceHead;
                if(!cell)
                    return false;
            }
    
            // At this point cell should point to a node that is not in any of the lists
            m_data[cell->m_idx] = data;
    
            // Push that node as the new head of the consumer list
            cell->m_next = m_consumeHead;
            while (!std::atomic_compare_exchange_strong(&m_consumeHead,
                                                        &cell->m_next, cell))
            {
                cell->m_next = m_consumeHead;
            }
            return true;
        }
    
        bool pop(Data& data)
        {
            if(m_consumeHead == NULL)
                return false;
    
            // Pop the consumer list
            CellNode* cell = m_consumeHead;
            while(!std::atomic_compare_exchange_strong(&m_consumeHead,
                                                       &cell, cell->m_next))
            {
                cell = m_consumeHead;
                if(!cell)
                    return false;
            }
    
            // At this point cell should point to a node that is not in any of the lists
            data = m_data[cell->m_idx];
    
            // Push that node as the new head of the producer list
            cell->m_next = m_produceHead;
            while(!std::atomic_compare_exchange_strong(&m_produceHead,
                                                       &cell->m_next, cell))
            {
                cell->m_next = m_produceHead;
            }
            return true;
        };
    
    private:
    
        Data m_data[queueSize];
    
        // The nodes for the two lists
        CellNode m_nodes[queueSize];
    
        volatile std::atomic<CellNode*> m_produceHead;
        volatile std::atomic<CellNode*> m_consumeHead;
    };
    
    #endif
    

    解决方案

    I see a few problems with your queue implementation:

    1. It's not a queue, it's a stack: the most recent item pushed is the first item popped. Not that there's anything wrong with stacks, but it's confusing to call it a queue. In fact it is two lock-free stacks: one stack that is initially populated with the array of nodes, and another stack that stores actual data elements using the first stack as a list of free nodes.

    2. There is a data race on CellNode::m_next in both push and pop (unsurprisingly, since they both do the same thing, i.e., pop a node from one stack and push that node onto the other). Say two threads simultaneously enter e.g. pop and both read the same value from m_consumeHead. Thread 1 races ahead successfully popping and sets data. Then Thread 1 writes the value of m_produceHead into cell->m_next while Thread 2 is simultaneously reading cell->m_next to pass to std::atomic_compare_exchange_strong_explicit. The simultaneous non-atomic read and write of cell->m_next by two threads is by definition a data race.

      This is what is known as a "benign" race in the concurrency literature: a stale/invalid value is read, but never gets used. If you are confident that your code will never need to run on an architecture where it could cause fiery explosions you may ignore it, but for strict conformance with the Standard memory model you need to make m_next an atomic and use at least memory_order_relaxed reads to eliminate the data race.

    3. ABA. The correctness of your compare-exchange loops is based on the premise that an atomic pointer (e.g., m_produceHead and m_consumeHead) having the same value at both the initial load and the later compare-exchange implies that the pointee object must therefore be unchanged as well. This premise does not hold in any design in which it is possible to recycle an object faster than some thread makes a trip through its compare-exchange loop. Consider this sequence of events:

      1. Thread 1 enters pop and reads the value of m_consumeHead and m_consumeHead->m_next but blocks before calling the compare-exchange.
      2. Thread 2 successfully pops that node from m_consumeHead and blocks as well.
      3. Thread 3 pushes several nodes onto m_consumeHead.
      4. Thread 2 unblocks and pushes the original node onto m_produceHead.
      5. Thread 3 pops that node from m_produceHead, and pushes it back onto m_consumeHead.
      6. Thread 1 finally unblocks and calls the compare-exchange function, which succeeds since the value of m_consumeHead is the same. It pops the node - which is all well and good - but sets m_consumeHead to the stale m_next value it read back in step 1. All the nodes pushed by Thread 3 in the meantime are leaked.

    这篇关于多线程队列原子操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆