这个无锁的.NET队列线程是否安全? [英] Is this lock-free .NET queue thread safe?

查看:182
本文介绍了这个无锁的.NET队列线程是否安全?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,下面的类是单读者单写入队列类线程安全吗?这种队列被称为无锁定,即使队列被填满也会阻塞。数据结构受 Marc Gravell实施阻塞队列这里在StackOverflow。



结构的要点是允许单个线程将数据写入缓冲区,另一个线程读取数据。所有这一切都需要尽快发生。



类似的数据结构在文章在DDJ由Herb Sutter ,但实现是在C ++中。另一个区别是,我使用一个香草链接列表,我使用一个列表的数组。



而不仅仅是包含代码片段,我将整个事情与评论一个允许的开源许可证(MIT License 1.0),以防任何人发现它有用,并希望使用它(按原样或修改)。



这与Stack Overflow有关如何创建阻塞并发队列的其他问题有关(请参阅在.NET中创建一个blockinq队列 .NET中的线程安全阻止队列实现)。



这是代码:

  using System; 
使用System.Collections.Generic;
使用System.Threading;
使用System.Diagnostics;

命名空间CollectionSandbox
{
///这是一个单独的读者/单人作家缓冲队列实现
///与(几乎)没有锁。这个实现只有在填充
///时才阻止。该实现是数组的链表。
///它的灵感来自于通过Marc Gravell
/// http:// stackoverflow创建C#中的阻塞队列实现的非阻塞版本
///的愿望。 com / questions / 530211 / creating-a-blocking-queuet-in-net / 530228#530228
class SimpleSharedQueue< T> :IStreamBuffer< T>
{
///用于发信号不再满
ManualResetEvent canWrite = new ManualResetEvent(true);

///这是缓冲区的大小
const int BUFFER_SIZE = 512;

///这是最大节点数。
const int MAX_NODE_COUNT = 100;

///这标志着写入新数据的位置。
游标加法器;

///这标志着从中读取新数据的位置。
游标去除剂;

///表示不再将数据写入节点。
public bool completed = false;

///一个节点是一个数据项数组,指向下一个项目的指针
///和占用项目数量的索引
class Node
{
///存储数据的位置。
public T [] data = new T [BUFFER_SIZE];

///当前存储在节点中的数据项数。
public Node next;

///当前存储在节点中的数据项数。
public int count;

///默认构造函数,仅用于第一个节点。
public Node()
{
count = 0;
}

///只有作者才能向场景添加新的节点
public Node(T x,Node prev)
{
data [0] = x;
count = 1;

//上一个节点必须安全地更新以指向该节点。
//读者可以看看这一点,而我们设置它,所以这应该是
//原子。
Interlocked.Exchange(ref prevnnext,this);
}
}

///这用于指向单个节点中的某个位置,并且可以执行
///读取或写入。一个光标只会读取,另一个光标只有
///写入。
class Cursor
{
///指向父队列
public SimpleSharedQueue< T> q;

///当前节点
public Node node;

///对于作家,这指向下一个项目将被写入的位置。
///对于读者,这指向下一个项目将被读取的位置。
public int current = 0;

///创建一个新的游标,指向节点
public Cursor(SimpleSharedQueue< T> q,Node node)
{
this.q = q ;
this.node = node;
}

///用于将更多数据推送到队列
public void Write(T x)
{
Trace.Assert(current = = node.count);

//检查我们是否处于节点限制,并且需要分配一个新的缓冲区。
if(current == BUFFER_SIZE)
{
//检查队列是否满
if(q.IsFull())
{
/ /将canWrite事件发送到false
q.canWrite.Reset();

//等到canWrite事件发出信号
q.canWrite.WaitOne();
}

//创建一个新节点
node = new Node(x,node);
current = 1;
}
else
{
//如果实现是正确的,那么读者永远不会尝试访问这个
//数组位置,而我们设置它。这是因为
//如果读取器和写入器位于同一个节点的不变量:
// reader.current< node.count
//和
// writer.current = node.count
node.data [current ++] = x;

//我们必须使用联锁,以确保我们增加计数
// atomicalluy,因为读者可能正在阅读它。
Interlocked.Increment(ref node.count);
}
}

///从队列中拉取数据,只有
///有
public bool才返回false Read(ref T x )
{
while(true)
{
if(current< node.count)
{
x = node.data [current ++];
返回true;
}
else if((current == BUFFER_SIZE)&(node.next!= null))
{
//将当前节点移动到下一个节点。
//我们知道这样做是安全的。
//旧节点将不再有引用它
//,并且将被垃圾收集器删除。
node = node.next;

//如果有一个作者线程等待队列,
//然后释放它。
//在概念上有一个if(q.IsFull),但是我们不能把它放到
//,因为这样会导致竞争条件。
q.canWrite.Set();

//指向第一个地点
current = 0;

//其中一个不变量是在第一个
//之后创建的每个节点将至少有一个项目。所以以下调用是安全的
x = node.data [current ++];
返回true;
}

//如果我们到这里,我们已经阅读了最近添加的数据。
//然后我们检查作者是否已经完成了数据的生成。
if(q.completed)
return false;

//如果我们到达这里没有数据等待,没有标记完成的线程。
//等待一个毫秒。系统也将上下文切换。
//这将允许写线程一些额外的资源来抽出
//更多的数据(特别是如果它是自己的多线程)
Thread.Sleep(1);
}
}
}

///返回当前使用的节点数。
private int NodeCount
{
get
{
int result = 0;
Node cur = null;
Interlocked.Exchange< Node>(ref cur,remover.node);

//将所有节点从移除器计数到加法器
//不高效,但这不经常调用。
while(cur!= null)
{
++结果;
Interlocked.Exchange< Node>(ref cur,cur.next);
}
返回结果;
}
}

///构造队列。
public SimpleSharedQueue()
{
Node root = new Node();
adder = new Cursor(this,root);
remover = new Cursor(this,root);
}

///向读者指出不会写入更多的数据。
public void MarkCompleted()
{
completed = true;
}

///阅读下一个数据。如果没有更多数据,则返回false。
public bool Read(ref T x)
{
return remover.Read(ref x);
}

///写入更多的数据。
public void Write(T x)
{
adder.Write(x);
}

///告诉我们是否有太多的节点,不能再添加。
private bool IsFull()
{
返回NodeCount == MAX_NODE_COUNT;
}
}
}


解决方案

Microsoft Research CHESS应该被证明是测试您的实施的好工具。


My question is, is the class included below for a single-reader single-writer queue class thread-safe? This kind of queue is called lock-free, even if it will block if the queue is filled. The data structure was inspired by Marc Gravell's implementation of a blocking queue here at StackOverflow.

The point of the structure is to allow a single thread to write data to the buffer, and another thread to read data. All of this needs to happen as quickly as possible.

A similar data structure is described in an article at DDJ by Herb Sutter, except the implementation is in C++. Another difference is that I use a vanilla linked list, I use a linked list of arrays.

Rather than just including a snippet of code I include the whole thing with comment with a permissive open source license (MIT License 1.0) in case anyone finds it useful, and wants to use it (as-is or modified).

This is related to other questions asked on Stack Overflow of how to create a blocking concurrent queues (see Creating a blockinq Queue in .NET and Thread-safe blocking queue implementation in .NET).

Here is the code:

using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace CollectionSandbox
{
    /// This is a single reader / singler writer buffered queue implemented
    /// with (almost) no locks. This implementation will block only if filled 
    /// up. The implementation is a linked-list of arrays.
    /// It was inspired by the desire to create a non-blocking version 
    /// of the blocking queue implementation in C# by Marc Gravell
    /// http://stackoverflow.com/questions/530211/creating-a-blocking-queuet-in-net/530228#530228
    class SimpleSharedQueue<T> : IStreamBuffer<T>
    {
        /// Used to signal things are no longer full
        ManualResetEvent canWrite = new ManualResetEvent(true);

        /// This is the size of a buffer 
        const int BUFFER_SIZE = 512;

        /// This is the maximum number of nodes. 
        const int MAX_NODE_COUNT = 100;

        /// This marks the location to write new data to.
        Cursor adder;

        /// This marks the location to read new data from.
        Cursor remover;

        /// Indicates that no more data is going to be written to the node.
        public bool completed = false;

        /// A node is an array of data items, a pointer to the next item,
        /// and in index of the number of occupied items 
        class Node
        {
            /// Where the data is stored.
            public T[] data = new T[BUFFER_SIZE];

            /// The number of data items currently stored in the node.
            public Node next;

            /// The number of data items currently stored in the node.
            public int count;

            /// Default constructor, only used for first node.
            public Node()
            {
                count = 0;
            }

            /// Only ever called by the writer to add new Nodes to the scene
            public Node(T x, Node prev)
            {
                data[0] = x;
                count = 1;

                // The previous node has to be safely updated to point to this node.
                // A reader could looking at the point, while we set it, so this should be 
                // atomic.
                Interlocked.Exchange(ref prev.next, this);
            }
        }

        /// This is used to point to a location within a single node, and can perform 
        /// reads or writers. One cursor will only ever read, and another cursor will only
        /// ever write.
        class Cursor
        {
            /// Points to the parent Queue
            public SimpleSharedQueue<T> q;

            /// The current node
            public Node node;

            /// For a writer, this points to the position that the next item will be written to.
            /// For a reader, this points to the position that the next item will be read from.
            public int current = 0;

            /// Creates a new cursor, pointing to the node
            public Cursor(SimpleSharedQueue<T> q, Node node)
            {
                this.q = q;
                this.node = node;
            }

            /// Used to push more data onto the queue
            public void Write(T x)
            {
                Trace.Assert(current == node.count);

                // Check whether we are at the node limit, and are going to need to allocate a new buffer.
                if (current == BUFFER_SIZE)
                {
                    // Check if the queue is full
                    if (q.IsFull())
                    {
                        // Signal the canWrite event to false
                        q.canWrite.Reset();

                        // Wait until the canWrite event is signaled 
                        q.canWrite.WaitOne();
                    }

                    // create a new node
                    node = new Node(x, node);
                    current = 1;
                }
                else
                {
                    // If the implementation is correct then the reader will never try to access this 
                    // array location while we set it. This is because of the invariant that 
                    // if reader and writer are at the same node: 
                    //    reader.current < node.count 
                    // and 
                    //    writer.current = node.count 
                    node.data[current++] = x;

                    // We have to use interlocked, to assure that we incremeent the count 
                    // atomicalluy, because the reader could be reading it.
                    Interlocked.Increment(ref node.count);
                }
            }

            /// Pulls data from the queue, returns false only if 
            /// there 
            public bool Read(ref T x)
            {
                while (true)
                {
                    if (current < node.count)
                    {
                        x = node.data[current++];
                        return true;
                    }
                    else if ((current == BUFFER_SIZE) && (node.next != null))
                    {
                        // Move the current node to the next one.
                        // We know it is safe to do so.
                        // The old node will have no more references to it it 
                        // and will be deleted by the garbage collector.
                        node = node.next;

                        // If there is a writer thread waiting on the Queue,
                        // then release it.
                        // Conceptually there is a "if (q.IsFull)", but we can't place it 
                        // because that would lead to a Race condition.
                        q.canWrite.Set();

                        // point to the first spot                
                        current = 0;

                        // One of the invariants is that every node created after the first,
                        // will have at least one item. So the following call is safe
                        x = node.data[current++];
                        return true;
                    }

                    // If we get here, we have read the most recently added data.
                    // We then check to see if the writer has finished producing data.
                    if (q.completed)
                        return false;

                    // If we get here there is no data waiting, and no flagging of the completed thread.
                    // Wait a millisecond. The system will also context switch. 
                    // This will allow the writing thread some additional resources to pump out 
                    // more data (especially if it iself is multithreaded)
                    Thread.Sleep(1);
                }
            }
        }

        /// Returns the number of nodes currently used.
        private int NodeCount
        {
            get
            {
                int result = 0;
                Node cur = null;
                Interlocked.Exchange<Node>(ref cur, remover.node);

                // Counts all nodes from the remover to the adder
                // Not efficient, but this is not called often. 
                while (cur != null)
                {
                    ++result;
                    Interlocked.Exchange<Node>(ref cur, cur.next);
                }
                return result;
            }
        }

        /// Construct the queue.
        public SimpleSharedQueue()
        {
            Node root = new Node();
            adder = new Cursor(this, root);
            remover = new Cursor(this, root);
        }

        /// Indicate to the reader that no more data is going to be written.
        public void MarkCompleted()
        {
            completed = true;
        }

        /// Read the next piece of data. Returns false if there is no more data. 
        public bool Read(ref T x)
        {
            return remover.Read(ref x);
        }

        /// Writes more data.
        public void Write(T x)
        {
            adder.Write(x);
        }

        /// Tells us if there are too many nodes, and can't add anymore.
        private bool IsFull()
        {
            return NodeCount == MAX_NODE_COUNT;  
        }
    }
}

解决方案

Microsoft Research CHESS should prove to be a good tool for testing your implementation.

这篇关于这个无锁的.NET队列线程是否安全?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆