awaitable基于任务队列 [英] awaitable Task based queue

查看:140
本文介绍了awaitable基于任务队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我不知道是否存在的 ConcurrentQueue ,类似于<实施/包装A HREF =htt​​p://msdn.microsoft.com/en-us/library/dd267312.aspx> BlockingCollection ,其中以从集合不会阻止,但反而是异步的,会导致一个异步等待,直到一项被放置在队列中。

我拿出我自己的实现,但它似乎并不像预期的那样表演。我想知道如果我重新发明已经存在的东西。

下面是我实现的:

 公共类的MessageQueue&LT; T&GT;
{
    ConcurrentQueue&LT; T&GT;队列=新ConcurrentQueue&LT; T&GT;();    ConcurrentQueue&LT; TaskCompletionSource&LT; T&GT;&GT; waitingQueue =
        新ConcurrentQueue&LT; TaskCompletionSource&LT; T&GT;&GT;();    反对queueSyncLock =新的对象();    公共无效排队(T项)
    {
        queue.Enqueue(项目);
        ProcessQueues();
    }    公共异步任务&LT; T&GT;出列()
    {
        TaskCompletionSource&LT; T&GT; TCS =新TaskCompletionSource&LT; T&GT;();
        waitingQueue.Enqueue(TCS);
        ProcessQueues();
        返回tcs.Task.IsCompleted? tcs.Task.Result:等待tcs.Task;
    }    私人无效ProcessQueues()
    {
        TaskCompletionSource&LT; T&GT; TCS = NULL;
        ŧ=与firstItem默认(T);
        而(真)
        {
            布尔OK;
            锁定(queueSyncLock)
            {
                OK = waitingQueue.TryPeek(TCS出)及;&安培; queue.TryPeek(出与firstItem);
                如果(OK)
                {
                    waitingQueue.TryDequeue(出TCS);
                    queue.TryDequeue(出与firstItem);
                }
            }
            如果突破(好!);
            tcs.SetResult(与firstItem);
        }
    }
}


解决方案

我不知道无锁解决方案,但你可以看看在新的数据流库的的异步CTP 。一个简单的 BufferBlock&LT; T&GT; 应该足够了,例如:

  BufferBlock&LT; INT&GT;缓冲区=新BufferBlock&LT;&诠释GT;();

生产和消费是最容易通过扩展方法做了数据流块类型。

制作很简单,只要:

  buffer.Post(13);

和消费是异步准备:

  INT项=等待buffer.ReceiveAsync();

我建议如果可能的话你使用数据流;作出这样的缓冲既高效又正确难度较第一次出现了。

I'm wondering if there exists an implementation/wrapper for ConcurrentQueue, similar to BlockingCollection where taking from the collection does not block, but is instead asynchronous and will cause an async await until an item is placed in the queue.

I've come up with my own implementation, but it does not seem to be performing as expected. I'm wondering if I'm reinventing something that already exists.

Here's my implementation:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

解决方案

I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T> should suffice, e.g.:

BufferBlock<int> buffer = new BufferBlock<int>();

Production and consumption are most easily done via extension methods on the dataflow block types.

Production is as simple as:

buffer.Post(13);

and consumption is async-ready:

int item = await buffer.ReceiveAsync();

I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.

这篇关于awaitable基于任务队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆