无锁双向链表的原子操作 [英] Atomic operations for lock-free doubly linked list

查看:2034
本文介绍了无锁双向链表的原子操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我根据这些论文撰写了一个无锁双重链接列表:



基于引用计数的高效可靠的无锁内存回收
Anders Gidenstam,IEEE成员,Marina Papatriantafilou,H˚akan Sundell和Philippas Tsigas



无锁定和双向链接列表
HåkanSundell, Philippas Tsigas



对于这个问题,我们可以把第一张纸放在一边。



用于在字中存储删除标志和指针的方式。
(更多信息这里



伪代码对于本节中的此部分:

  union链接
:word
(p,d):{指向Node的指针,boolean}

结构Node
value:指向字
上一个:union Link
下一个:union Link
  template<类型名NodeT> 
struct LockFreeLink
{
public:
typedef NodeT NodeType;

private:

protected:
std :: atomic< NodeT *> mPointer;

public:
bcLockFreeLink()
{
std :: atomic_init(& mPointer,nullptr);
}
〜bcLockFreeLink(){}

inline NodeType * getNode()const throw()
{
return std :: atomic_load mPointer,std :: memory_order_relaxed);
}
inline std :: atomic< NodeT *> * getAtomicNode()const throw()
{
return& mPointer;
}
};

struct node:public LockFreeNode
{
struct Link:protected LockFreeLink<节点>
{
static const int dMask = 1;
static const int ptrMask =〜dMask;

Link(){} throw()
Link(const Node * pPointer,bcBOOL pDel = bcFALSE)throw()
{
std :: atomic_init ; mPointer,(reinterpret_cast< int>(pPointer)|(int)pDel));
}

Node * pointer()const throw()
{
return reinterpret_cast< Node *>(
std :: atomic_load data,std :: memory_order_relaxed)& ptrMask);
}
bool del()const throw()
{
return std :: atomic_load(& data,std :: memory_order_relaxed)& dMask;
}
bool compareAndSwap(const Link& pExpected,const Link& pNew)throw()
{
Node * lExpected = std :: atomic_load(& pExpected.mPointer,std :: memory_order_relaxed);
Node * lNew = std :: atomic_load(& pNew.mPointer,std :: memory_order_relaxed);

return std :: atomic_compare_exchange_strong_explicit(
& mPointer,
& lExpected,
lNew,
std :: memory_order_relaxed,
std :: memory_order_relaxed);
}

bool operator ==(const Link& pOther)throw()
{
return std :: atomic_load(data,std :: memory_order_relaxed)==
std :: atomic_load(pOther.data,std :: memory_order_relaxed);
}
bool operator!=(const Link& pOther)throw()
{
return!operator ==(pOther);
}
};

链接mPrev;
Link mNext;
键入mData;

Node(){};
Node(const Type& pValue):mData(pValue){};
};

在本文中,设置删除标记的链接到true的功能:



过程SetMark(link:指向Node的指针)
while true do
node = * link;
如果node.d = true或CAS(link,node,(node.p,true))then break;

这个函数的代码如下:

  void _setMark(Link * pLink)
{
while(bcTRUE)
{
Link lOld = * pLink;
if(pLink-> del()|| pLink-> compareAndSwap(lOld,Link(pLink-> pointer(),bcTRUE)))
break;但是我的问题在于
}
}


$ b < compareAndSwap
函数,其中i必须比较和交换三个原子变量。有关问题的信息是此处



(实际上 new 变量在比较和交换函数不重要,因为它是线程局部)



现在我的问题:如何写compareAndSwap



p>编辑:



类似的问题出现在内存管理器中:

  function CompareAndSwapRef(link:指向toNode的指针,
old:指向toNode,new:指向toNode):boolean
如果CAS(link,old,new)then
if new =
FAA(& new.mmref,1);
new.mmtrace:= false;
如果old = NULL,则FAA(& old.mmref,-1);
return true;
return false;

这里我必须比较和交换三个原子变量。
(注意我的参数是 Link 的类型,我必须比较和交换 mPointer $ c> Link

解决方案

在x64上,只使用44位的地址空间。如果你的指针对齐到8个字节,那么你只使用41位。 41x2对于64位仍然太大。有一个128位的比较和交换,虽然我不能保证它的速度。我总是试图使用64位一。



也许你最多只需要20亿个节点。所以你可以做的是预分配一个列表从中拉出的节点池。当然,通过使用原子操作获取下一个空闲池索引来创建节点。然后,而不是next和prev是指针,它们可以是31位索引到节点池中,并且您有2位剩余的删除标志。假设你不需要20亿个节点,你还有更多的位。唯一的缺点是你必须知道在启动时你需要多少个节点,虽然你可以重新分配节点,如果你也有。



我所做的是使用虚拟内存函数来保留GB的地址空间,然后将物理RAM映射到该空间,因为我需要它来扩展我的池,而不必重新分配。


I am writing a lock-free doubly linked list based on these papers:

"Efficient and Reliable Lock-Free Memory Reclamation Based on Reference Counting" Anders Gidenstam,Member, IEEE,Marina Papatriantafilou, H˚ akan Sundell and Philippas Tsigas

"Lock-free deques and doubly linked lists" Håkan Sundell, Philippas Tsigas

For this question we can put aside first paper.

In this paper, they use a smart way for storing a deletion flag and a pointer in a word. (More info here)

Pseudo code for this section in the paper:

union Link
    : word
    (p,d): {pointer to Node, boolean} 

structure Node
    value: pointer to word
    prev: union Link
    next: union Link

And my code for above pseudo code:

template< typename NodeT >
struct LockFreeLink
{
public:
    typedef NodeT NodeType;

private:

protected:
    std::atomic< NodeT* > mPointer;

public:
    bcLockFreeLink()
    {
        std::atomic_init(&mPointer, nullptr);
    }
    ~bcLockFreeLink() {}

    inline NodeType* getNode() const throw()
    {
        return std::atomic_load(&mPointer, std::memory_order_relaxed);
    }
    inline std::atomic< NodeT* >* getAtomicNode() const throw()
    {
        return &mPointer;
    }
};

struct Node : public LockFreeNode
{
    struct Link : protected LockFreeLink< Node >
    {
        static const int dMask = 1;
        static const int ptrMask = ~dMask;

        Link() { } throw()
        Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()
        { 
            std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
        }

        Node* pointer() const throw() 
        { 
            return reinterpret_cast<Node*>(
                std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
        }
        bool del() const throw() 
        { 
            return std::atomic_load(&data, std::memory_order_relaxed) & dMask; 
        }
        bool compareAndSwap(const Link& pExpected, const Link& pNew) throw() 
        { 
            Node* lExpected = std::atomic_load(&pExpected.mPointer, std::memory_order_relaxed);
            Node* lNew = std::atomic_load(&pNew.mPointer, std::memory_order_relaxed);

            return std::atomic_compare_exchange_strong_explicit(
                &mPointer,
                &lExpected,
                lNew,
                std::memory_order_relaxed,
                std::memory_order_relaxed); 
        }

        bool operator==(const Link& pOther) throw() 
        { 
            return std::atomic_load(data, std::memory_order_relaxed) == 
                std::atomic_load(pOther.data, std::memory_order_relaxed); 
        }
        bool operator!=(const Link& pOther) throw() 
        { 
            return !operator==(pOther); 
        }
    };

    Link mPrev;
    Link mNext;
    Type mData;

    Node() {};
    Node(const Type& pValue) : mData(pValue) {};
};

In this paper there is this function for set deletion mark of link to true:

procedure SetMark(link: pointer to pointer to Node)
    while true do
       node = *link;
       if node.d = true or CAS(link, node, (node.p, true)) then break;

And my code for this function:

void _setMark(Link* pLink)
{
    while (bcTRUE)
    {
        Link lOld = *pLink;
        if(pLink->del() || pLink->compareAndSwap(lOld, Link(pLink->pointer(), bcTRUE)))
            break;
    }
}

But my problem is in compareAndSwap function where i must compare and swap three atomic variable. Information about problem is here

(Actually new variable in compare and swap function isn't important because it is thread local)

Now my question: how can i write compareAndSwap function to compare and swap three atomic varialbe or where am i making mistake?

(Excuse me for long question)

Edit:

similar problem is in memory manager paper:

function CompareAndSwapRef(link:pointer to pointer toNode,
old:pointer toNode, new:pointer toNode):boolean
    if CAS(link,old,new) then
        if new=NULL then
            FAA(&new.mmref,1);
            new.mmtrace:=false;
    if old=NULLthen FAA(&old.mmref,-1);
    return true;
return false; 

here again i must compare and swap three atomic variable. (Note that my arguments are type of Link and i must compare and swap mPointer of Link)

解决方案

On x64, only 44 bits of address space are used. If your pointers are aligned to 8 bytes then you are only using 41 bits. 41x2 is still too large for 64 bits. There is a 128 bit compare and swap although I can't vouch for its speed. I always try to use the 64 bit one.

Maybe you only need up to 2 billion nodes. So what you could do is preallocate a pool of nodes that the list pulls from. You create nodes by grabbing the next free pool index using atomic ops of course. Then instead of next and prev being pointers, they could be 31 bit indexes into the node pool and you have 2 bits left over for delete flags. Assuming you don't need 2 billion nodes, you have even more bits left over. The only downside is you have to know how many nodes you are going to need at startup, although you could realloc the nodes if you had too.

What I have done is used virtual memory functions to reserve GB of address space and then map physical ram into that space as I need it to extend my pool without having to reallocate.

这篇关于无锁双向链表的原子操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆