如何(以及是否)使用 TPL 编写单消费者队列? [英] How (and if) to write a single-consumer queue using the TPL?

查看:21
本文介绍了如何(以及是否)使用 TPL 编写单消费者队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我最近听到了很多关于 .NET 4.0 中的 TPL 的播客.他们中的大多数都描述了下载图像或进行计算等后台活动,使用任务以使工作不会干扰 GUI 线程.

I've heard a bunch of podcasts recently about the TPL in .NET 4.0. Most of them describe background activities like downloading images or doing a computation, using tasks so that the work doesn't interfere with a GUI thread.

我处理的大部分代码都具有多生产者/单一消费者的风格,其中来自多个来源的工作项必须排队,然后按顺序处理.一个示例是日志记录,其中来自多个线程的日志行按顺序排列到单个队列中,以最终写入文件或数据库.来自任何单一来源的所有记录都必须保持有序,并且来自同一时间的记录在最终输出中应该彼此接近".

Most of the code I work on has more of a multiple-producer / single-consumer flavor, where work items from multiple sources must be queued and then processed in order. One example would be logging, where log lines from multiple threads are sequentialized into a single queue for eventual writing to a file or database. All the records from any single source must remain in order, and records from the same moment in time should be "close" to each other in the eventual output.

所以多个线程或任务或其他都在调用队列器:

So multiple threads or tasks or whatever are all invoking a queuer:

lock( _queue ) // or use a lock-free queue!
{
   _queue.enqueue( some_work );
   _queueSemaphore.Release();
}

一个专门的工作线程处理队列:

And a dedicated worker thread processes the queue:

while( _queueSemaphore.WaitOne() )
{
   lock( _queue )
   {
      some_work = _queue.dequeue();     
   }
   deal_with( some_work );
}

将工作线程专用于这些任务的消费者端似乎总是合理的.我应该改用 TPL 中的一些构造来编写未来的程序吗?哪一个?为什么?

It's always seemed reasonable to dedicate a worker thread for the consumer side of these tasks. Should I write future programs using some construct from the TPL instead? Which one? Why?

推荐答案

您可以按照 Wilka 的建议使用长时间运行的 Task 来处理 BlockingCollection 中的项目.这是一个几乎可以满足您的应用程序要求的示例.你会看到类似这样的输出:

You can use a long running Task to process items from a BlockingCollection as suggested by Wilka. Here's an example which pretty much meets your applications requirements. You'll see output something like this:

Log from task B
Log from task A
Log from task B1
Log from task D
Log from task C

不是来自 A、B、C 和的输出D 出现是随机的,因为它们取决于线程的开始时间,但 B 总是出现在 B1 之前.

Not that outputs from A, B, C & D appear random because they depend on the start time of the threads but B always appears before B1.

public class LogItem 
{
    public string Message { get; private set; }

    public LogItem (string message)
    {
        Message = message;
    }
}

public void Example()
{
    BlockingCollection<LogItem> _queue = new BlockingCollection<LogItem>();

    // Start queue listener...
    CancellationTokenSource canceller = new CancellationTokenSource();
    Task listener = Task.Factory.StartNew(() =>
        {
            while (!canceller.Token.IsCancellationRequested)
            {
                LogItem item;
                if (_queue.TryTake(out item))
                    Console.WriteLine(item.Message);
            }
        },
    canceller.Token, 
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    // Add some log messages in parallel...
    Parallel.Invoke(
        () => { _queue.Add(new LogItem("Log from task A")); },
        () => { 
            _queue.Add(new LogItem("Log from task B")); 
            _queue.Add(new LogItem("Log from task B1")); 
        },
        () => { _queue.Add(new LogItem("Log from task C")); },
        () => { _queue.Add(new LogItem("Log from task D")); });

    // Pretend to do other things...
    Thread.Sleep(1000);

    // Shut down the listener...
    canceller.Cancel();
    listener.Wait();
}

这篇关于如何(以及是否)使用 TPL 编写单消费者队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆