基于等待任务的队列 [英] awaitable Task based queue

查看:30
本文介绍了基于等待任务的队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否存在 ConcurrentQueue,类似于 BlockingCollection,其中从集合中取出不会阻塞,但是而是异步的,并且会导致异步等待,直到将项目放入队列中.

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

我已经提出了自己的实现,但它似乎没有按预期执行.我想知道我是否在重新发明已经存在的东西.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

这是我的实现:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

推荐答案

我不知道有没有锁的解决方案,但是你可以看看新的 数据流库异步 CTP.一个简单的 BufferBlock 就足够了,例如:

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

生产和消费最容易通过数据流块类型的扩展方法完成.

Production and consumption are most easily done via extension methods on the dataflow block types.

生产就这么简单:

buffer.Post(13);

并且消费是异步就绪的:

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

如果可能,我建议您使用 Dataflow;使这样的缓冲区既有效又正确,比最初看起来要困难得多.

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

这篇关于基于等待任务的队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆