创建阻塞队列< T>在.NET? [英] Creating a blocking Queue<T> in .NET?

查看:131
本文介绍了创建阻塞队列< T>在.NET?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一种情况,我有相同的队列中读取多个线程添加到队列和多线程。如果队列达到特定大小所有线程被填充的队列将被阻止在添加到一个项目从队列中删除。

下面的解决方案是,我使用的是什么,现在,我的问题是:这怎么可能提高?是否有一个对象,它已经实现,我应该使用的BCL这种行为?

 内部类BlockingCollection< T> :CollectionBase的,IEnumerable的
{
    // TODO:可能是值得改变这一到适当的队列

    私人的AutoResetEvent _FullEvent =新的AutoResetEvent(假);

    内部温度T这[INT I]
    {
        {返回(T)名单[I] }
    }

    私人诠释_MaxSize;
    内部INT MAXSIZE
    {
        {返回_MaxSize; }
        组
        {
            _MaxSize =价值;
            checkSize();
        }
    }

    内部BlockingCollection(INT MAXSIZE)
    {
        MAXSIZE = MAXSIZE;
    }

    内部空隙添加(T项)
    {
        Trace.WriteLine(的String.Format(BlockingCollection添加等待:{0},Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(项目);

        Trace.WriteLine(的String.Format(BlockingCollection产品加入:{0},Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    内部无效删除(T项目)
    {
        锁定(名单)
        {
            List.Remove(项目);
        }

        Trace.WriteLine(的String.Format(BlockingCollection项目中删除:{0},Thread.CurrentThread.ManagedThreadId));
    }

    保护覆盖无效OnRemoveComplete在(INT指数,目标价值)
    {
        checkSize();
        base.OnRemoveComplete(指数值);
    }

    内部新的IEnumerator的GetEnumerator()
    {
        返回List.GetEnumerator();
    }

    私人无效checkSize()
    {
        如果(计数< MAXSIZE)
        {
            Trace.WriteLine(的String.Format(BlockingCollection FullEvent集:{0},Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        其他
        {
            Trace.WriteLine(的String.Format(BlockingCollection FullEvent复位:{0},Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
 

解决方案

这看起来非常不安全的(非常小的同步);怎么是这样的:

 类SizeQueue< T>
{
    私人只读队列< T>队列=新的队列< T>();
    私人只读INT最大范围;
    公共SizeQueue(INT MAXSIZE){this.maxSize = MAXSIZE; }

    公共无效排队(T项)
    {
        锁(队列)
        {
            而(queue.Count> = MAXSIZE)
            {
                Monitor.Wait(队列);
            }
            queue.Enqueue(项目);
            如果(queue.Count == 1)
            {
                //唤醒任何阻塞的出队
                Monitor.PulseAll(队列);
            }
        }
    }
    公共牛逼出列()
    {
        锁(队列)
        {
            而(queue.Count == 0)
            {
                Monitor.Wait(队列);
            }
            牛逼项目= queue.Dequeue();
            如果(queue.Count == MAXSIZE  -  1)
            {
                //唤醒任何阻止入队
                Monitor.PulseAll(队列);
            }
            返回的项目;
        }
    }
}
 

(编辑)

在现实中,你想要的方式来关闭队列,以便读者开始退出干净 - 也许像一个布尔标志 - 如果设置,空队列只返回(而不是阻塞):

 布尔收盘;
公共无效关闭()
{
    锁(队列)
    {
        关闭= TRUE;
        Monitor.PulseAll(队列);
    }
}
公共BOOL TryDequeue(出的T值)
{
    锁(队列)
    {
        而(queue.Count == 0)
        {
            如果(收盘)
            {
                值=默认(T);
                返回false;
            }
            Monitor.Wait(队列);
        }
        值= queue.Dequeue();
        如果(queue.Count == MAXSIZE  -  1)
        {
            //唤醒任何阻止入队
            Monitor.PulseAll(队列);
        }
        返回true;
    }
}
 

I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threads that are filling the queue will be blocked on add until an item is removed from the queue.

The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

解决方案

That looks very unsafe (very little synchronization); how about something like:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(edit)

In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

这篇关于创建阻塞队列&LT; T&GT;在.NET?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆