从队列中消费多个线程/任务 [英] Consume from Queue with multiple threads/tasks

查看:275
本文介绍了从队列中消费多个线程/任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个生产者,可以从资源中获取用户并将其放入ConcurrentQueue,然后我要做的就是使用多个消费者并处理所有用户并从另一资源中获取信息.

I have a producer that gets users from a resource and places them into a ConcurrentQueue, then What I want to do is using multiple consumers and process all users and get their information from another resource.

  public void Populate(IEnumerable<Users> users){
     _queue.Enqueue(users);
     // here single threaded
  }

  public void Process(){
     // here i want this to be processed by multiple consumers
     // say multiple threads so that I can finish processing them.
  }

我的问题是,我应该使用线程吗?任务?线程池?

My question is, should i use thread? task? ThreadPool?

我看到了以下问题: C#等效于Java ExecutorService.newSingleThreadExecutor(),或:如何序列化对资源的多线程访问

推荐答案

由于您已经在使用排队机制,因此建议您使用

Since you are using a queuing mechanism already, I suggest you use a BlockingCollection instead of ConcurrentQueue, along with Parallel.Invoke().

关于BlockingCollection的一些重要事项使其易于使用.

There are some important things about BlockingCollection that make it nice to use.

  1. BlockingCollection使使用线程可以使用foreach以线程安全和自然的方式从集合中获取项目.
  2. 使用中的foreach循环在队列为空时自动阻塞,并在有可用项时继续阻塞.
  3. BlockingCollection提供了一种易于使用的机制来表示数据结束.队列所有者只需调用queue.CompleteAdding(),当队列完全为空时,任何从队列中取出项目的foreach循环都将自动退出.
  1. BlockingCollection lets the consuming threads take items from the collection in a threadsafe and natural manner using foreach.
  2. The consuming foreach loop blocks automatically when the queue is empty, and continues when items become available.
  3. BlockingCollection provides an easy-to-use mechanism to signal the end of data. The queue owner simply calls queue.CompleteAdding() and any foreach loops taking items from the queue will exit automatically when the queue becomes completely empty.

您可以使用Parallel.Invoke()启动多个线程,每个线程都使用foreach遍历队列. (Parallel.Invoke()使您可以并行执行一系列任务,这使它使用起来非常简单.)

You can use Parallel.Invoke() to start a number of threads, each of which uses foreach to iterate over the queue. (Parallel.Invoke() lets you give it an array of tasks to run in parallel, which makes it quite simple to use.)

最好用示例程序来说明:

This is best illustrated with a sample program:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class User
    {
        public string Name;
    }

    class Program
    {
        readonly BlockingCollection<User> _queue = new BlockingCollection<User>();

        void run()
        {
            var background = Task.Factory.StartNew(process); // Start the processing threads.

            // Make up 50 sample users.
            var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});

            foreach (var user in users) // Add some sample data.
                _queue.Add(user);

            Console.WriteLine("Press <RETURN> to exit.");
            Console.ReadLine();
            _queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
            background.Wait();
            Console.WriteLine("Exited.");
        }

        void process() // Process the input queue,
        {
            int taskCount = 4;  // Let's use 4 threads.
            var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
            Parallel.Invoke(actions.ToArray());
        }

        void processQueue()
        {
            foreach (User user in _queue.GetConsumingEnumerable())
                processUser(user);
        }

        void processUser(User user)
        {
            Console.WriteLine("Processing user " + user.Name);
            Thread.Sleep(200); // Simulate work.
        }

        static void Main()
        {
            new Program().run();
        }
    }
}

如果您不需要限制并发线程的数量,并且乐于让.Net为您决定(这不是一个坏主意),则可以通过完全删除processQueue()并进行更改来大大简化代码. process()至:

If you don't need to limit the number of concurrent threads and are happy to let .Net decide for you (not a bad idea), then you can simplify the code quite a bit by removing processQueue() altogether and changing process() to:

void process() // Process the input queue,
{
    Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}

但是,该方法执行的锁定操作超出其所需的锁定范围,因此,最好只使用原始方法(不会受到该问题的困扰)或使用此处描述的解决方案:

However, that does more locking than it needs to, so you're probably best off just using the original method (which doesn't suffer from that problem), or using the solution described here: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

这篇关于从队列中消费多个线程/任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆