如何实现我自己的高级生产者/消费者场景? [英] How do I implement my own advanced Producer/Consumer scenario?

查看:25
本文介绍了如何实现我自己的高级生产者/消费者场景?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意:
我对我的问题进行了彻底的修改.您可以通过更改历史记录查看原始问题.

NOTE:
i did a complete rework of my question. you can see the original question via the change-history.

我需要一个强大"的人.队列,提供以下功能:

i'm in the need of a "mighty" queue, which provides following functionalities:

  • 我对一组对象有一定的范围.这意味着 A 组B 组、...将有自己的队列
  • 我正在组范围线程中填充队列线程 A(生产者)
  • 我正在读取组范围线程中的队列线程 B(消费者)
  • i have a certain scope for a group of objects. that means that Group A, Group B, ... will have their own queue
  • i'm filling a queue in a group-scoped thread Thread A (Producer)
  • i'm reading a queue in a group-scoped thread Thread B (Consumer)

所以我会有以下场景:

  1. 队列中没有任何项目(因为作业是用空的目标组"调用的):线程 B 应该退出循环
  2. 队列中当前没有项目,因为线程 A 正在处理要排队的项目:线程 B 应该等待
  3. 队列中有项目:线程 B 应该能够出列并处理项目
  4. 队列中没有项目,因为 线程 A 没有更多的项目要入队:线程 B 应该退出循环
  1. there is and will be no item in the queue (as the jobs were called with an empty "targetgroup"): Thread B should escape the loop
  2. there is currently no item in the queue, as Thread A is working on the item to enqueue: Thread B should wait
  3. there are items in the queue: Thread B should be able to dequeue and process the item
  4. there is no item in the queue, as Thread A has no more items to enqueue: Thread B should escape the loop

现在我想出了以下实现:

now i came up with following implementation:

public class MightyQueue<T>
  where T : class
{
    private readonly Queue<T> _queue = new Queue<T>();

    private bool? _runable;
    private volatile bool _completed;

    public bool Runable
    {
        get
        {
            while (!this._runable.HasValue)
            {
                Thread.Sleep(100);
            }
            return this._runable ?? false;
        }
        set
        {
            this._runable = value;
        }
    }

    public void Enqueue(T item)
    {
        if (item == null)
        {
            throw new ArgumentNullException("item");
        }

        this._queue.Enqueue(item);
    }

    public void CompleteAdding()
    {
        this._completed = true;
    }

    public bool TryDequeue(out T item)
    {
        if (!this.Runable)
        {
            item = null;
            return false;
        }
        while (this._queue.Count == 0)
        {
            if (this._completed)
            {
                item = null;
                return false;
            }
            Thread.Sleep(100);
        }
        item = this._queue.Dequeue();
        return true;
    }
}

然后会被使用

制片人

if (anythingToWorkOn)
{
    myFooMightyQueueInstance.Runable = false;
}
else
{
    myFooMightyQueueInstance.Runable = true;
    while (condition)
    {
        myFooMightyQueueInstance.Enqueue(item);
    }
    myFooMightyQueueInstance.CompleteAdding();
}

消费者

if (!myFooMightyQueueInstance.Runable)
{
    return;
}

T item;
while (myFooMightyQueueInstance.TryDequeue(out item))
{
    //work with item
}

但我相信,这种方法是错误的,因为我在那里使用了一些 Thread.Sleep()-stuff(不能有一些 waitHandle 或其他东西吗?)...我也不是关于算法本身......谁能帮帮我?

but i believe, that this approach is wrong, as i'm using some Thread.Sleep()-stuff in there (can't be there some waitHandle or something else?)... i'm not about the algo itself either ... can anyone please help me out?

推荐答案

你应该从一个通用的生产者-消费者队列开始并使用它.在队列中实现这一点并不是一个好主意,因为这会阻止您使用信号量来向线程发出信号(或者,您可以在队列中使用公共信号量,但这是一个真的坏主意).

You should begin with a generic Producer-Consumer queue and use that. Implementing this inside a Queue is not such a good idea, as this prevents you from using semaphores to signal threads (or, you could have public semaphores in your Queue, but that's a really bad idea).

一旦线程 A 将单个工作项入队,它必须发出信号量通知线程 B.当线程 B 完成处理所有项时,它应该发出信号量通知其他所有人它已完成.您的主线程应该等待第二个信号量知道一切都已完成.

As soon as the thread A has enqueued a single work item, it must signal a semaphore to notify thread B. When thread B has finished processing all items, it should signal a semaphore to notify everyone else that it has finished. Your main thread should be waiting for this second semaphore to know that everything is done.

首先,你有一个生产者和一个消费者:

First, you have a producer and a consumer:

public interface IProducer<T> : IStoppable
{
    /// <summary>
    /// Notifies clients when a new item is produced.
    /// </summary>
    event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}

public interface IConsumer<T> : IStoppable
{
    /// <summary>
    /// Performs processing of the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    void ConsumeItem(T item);
}

public interface IStoppable
{
    void Stop();
}

因此,在您的情况下,创建邮件的类需要触发 ItemProduced 事件,发送邮件的类需要实现 ConsumeItem.

So, in your case, class creating the mail will need to fire the ItemProduced event, and the class sending it will need to implement ConsumeItem.

然后你将这两个实例传递给 Worker 的一个实例:

And then you pass these two instances to an instance of Worker:

public class Worker<T>
{
    private readonly Object _lock = new Object();
    private readonly Queue<T> _queuedItems = new Queue<T>();
    private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
    private readonly IProducer<T> _producer;
    private readonly IConsumer<T> _consumer;
    private volatile bool _ending = false;
    private Thread _workerThread;

    public Worker(IProducer<T> producer, IConsumer<T> consumer)
    {
        _producer = producer;
        _consumer = consumer;
    }

    public void Start(ThreadPriority priority)
    {
        _producer.ItemProduced += Producer_ItemProduced;
        _ending = false;

        // start a new thread
        _workerThread = new Thread(new ThreadStart(WorkerLoop));
        _workerThread.IsBackground = true;
        _workerThread.Priority = priority;
        _workerThread.Start();
    } 

    public void Stop()
    {
        _producer.ItemProduced -= Producer_ItemProduced;
        _ending = true;

        // signal the consumer, in case it is idle
        _itemReadyEvt.Set();
        _workerThread.Join();
    }

    private void Producer_ItemProduced
         (object sender, ProducedItemEventArgs<T> e)
    {
        lock (_lock) { _queuedItems.Enqueue(e.Item); }

        // notify consumer thread
        _itemReadyEvt.Set();
    }

    private void WorkerLoop()
    {
        while (!_ending)
        {
            _itemReadyEvt.WaitOne(-1, false);

            T singleItem = default(T);
            lock (_lock)
            {
               if (_queuedItems.Count > 0)
               {
                   singleItem = _queuedItems.Dequeue();
               }
            }


            while (singleItem != null)
            {
                try
                {
                    _consumer.ConsumeItem(singleItem);
                }
                catch (Exception ex)
                {
                    // handle exception, fire an event
                    // or something. Otherwise this
                    // worker thread will die and you
                    // will have no idea what happened
                }

                lock (_lock)
                {
                    if (_queuedItems.Count > 0)
                    {
                        singleItem = _queuedItems.Dequeue();
                    }
                }
            }

         }

    } // WorkerLoop

} // Worker

这是大体思路,可能还需要一些额外的调整.

That's the general idea, there may be some additional tweaks needed.

要使用它,你需要让你的类实现这两个接口:

To use it, you need to have your classes implement these two interfaces:

IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();

Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();

// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();

// following line will block this (calling) thread
// until all items are consumed
worker.Stop();

这样做的好处在于:

  • 您可以拥有任意数量的工人
  • 多个工人可以接受来自同一生产者的物品
  • 多个工作人员可以将项目分派给同一个消费者(尽管这意味着您需要考虑消费者是以线程安全的方式实现的)

这篇关于如何实现我自己的高级生产者/消费者场景?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆