单一来源生产者到多个并行工作的消费者 [英] Single source producer to multiple cosumers working in parallel

查看:38
本文介绍了单一来源生产者到多个并行工作的消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要一种队列,其中单个源在其中输入数据,另一方面会有消费者等待,当他们检测到队列非空时将开始执行数据,直到他们停止.但重要的是,如果队列被清空,他们仍将继续观察队列,以便如果有更多数据弹出,他们将能够使用它.我通过 多个消费者和多个生产者 因为消费者嵌套在生产者中,在我的情况下我不能这样做,因为我将有一个单一的来源和消费者承诺到队列中,直到我停止他们.因此不是串行执行,而是消费者和生产者并行执行.

I want to have a kind of queue in which a single source inputs data in it and on the other side there will be consumers waiting that when they detect that the queue is not empty will start to execute the data until they are halted. but its important that if the queue is emptied they will still remain watching the queue such that if more data pops in they will be able to consume it. What i found By multiple consumer and multiple producers as the consumers are nested in the producers where in my case i cant do that as i will have a single source and consumers committed to the queue till i stop them. therefore not in series but both the consumer and the producers are executing in parallel.

将在

Parallel.Invoke(() => producer(), () => consumers());

这样的问题是我将如何并行执行有时为空的队列的内容

the problem as such is how i will execute the content of a queue which is sometimes empty in parallel

推荐答案

您可以使用 BlockingCollection.

You can solve this relatively easily using a BlockingCollection<T>.

您可以使用一个作为队列,并将对它的引用传递给 producer() 和每个 consumers().

You can use one as a queue, and pass a reference to it to the producer() and each of the consumers().

您将调用 GetConsumingEnumerable() 来自每个消费者线程,并与 foreach 一起使用.

You'll be calling GetConsumingEnumerable() from each consumer thread, and using it with foreach.

生产者线程将向集合中添加项目,并将调用CompleteAdding() 当它完成生产时.这将自动使所有消费者线程退出它们的 foreach 循环.

The producer thread will add items to the collection, and will call CompleteAdding() when it has finished producing stuff. This will automatically make all the consumer threads exit their foreach loops.

这是一个基本示例(没有错误处理).对 Thread.Sleep() 的调用是为了模拟负载,不应在实际代码中使用.

Here's a basic example (with no error handling). The calls to Thread.Sleep() are to simulate load, and should not be used in real code.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
            var plant = new ProcessingPlant();
            plant.Process();
            Console.WriteLine("Work complete.");
        }
    }

    public sealed class ProcessingPlant
    {
        private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();

        public void Process()
        {
            Parallel.Invoke(producer, consumers);
        }

        private void producer()
        {
            for (int i = 0; i < 100; ++i)
            {
                string item = i.ToString();
                Console.WriteLine("Producer is queueing {0}", item);
               _queue.Add(item);  // <- Here's where we add an item to the queue.
                Thread.Sleep(0);
            }

            _queue.CompleteAdding(); // <- Here's where we make all the consumers
        }                            //    exit their foreach loops.

        private void consumers()
        {
            Parallel.Invoke(
                () => consumer(1),
                () => consumer(2),
                () => consumer(3),
                () => consumer(4),
                () => consumer(5)
            );
        }

        private void consumer(int id)
        {
            Console.WriteLine("Consumer {0} is starting.", id);

            foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
            {
                Console.WriteLine("Consumer {0} read {1}", id, item);
                Thread.Sleep(0);
            }

            Console.WriteLine("Consumer {0} is stopping.", id);
        }
    }
}

(我知道这是使用一个额外的线程来启动消费者,但我这样做是为了避免混淆真正的重点——这是为了演示 BlockingCollection 的使用.)

(I know this is using an extra thread just to start the consumers, but I did it this way to avoid obscuring the real point - which is to demonstrate the use of BlockingCollection.)

这篇关于单一来源生产者到多个并行工作的消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆