我该如何实现自己的先进生产者/消费者的情况? [英] How do I implement my own advanced Producer/Consumer scenario?

查看:137
本文介绍了我该如何实现自己的先进生产者/消费者的情况?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注:
我做了我的问题一个完整的返工。您可以通过变化的历史看原来的问题。

NOTE:
i did a complete rework of my question. you can see the original question via the change-history.

我在需要的强大的队列中,它提供了以下功能:

i'm in the need of a "mighty" queue, which provides following functionalities:

  • 在我有一定的范围内为一组对象。这意味着 A组 B组,......都会有自己的队列
  • 即时填充在一组范围的线程队列中的线程A (监制)
  • 在我读队列在一组范围的线程线程B (消费者)
  • i have a certain scope for a group of objects. that means that Group A, Group B, ... will have their own queue
  • i'm filling a queue in a group-scoped thread Thread A (Producer)
  • i'm reading a queue in a group-scoped thread Thread B (Consumer)

,所以我将有以下方案:

so i will have following scenarios:

  1. 有,将在队列中没有的项目(如作业,被称为用一个空的targetgroup):线程B 应该逃避环
  2. 目前队列中没有的项目,如线程A 正在研究的项目排队:线程B 应等待
  3. 有队列中的项目:线程B 应该可以出队并处理项目
  4. 没有在队列中没有的项目,如线程A 已经没有更多的项目来排队:线程B 应该逃避环
  1. there is and will be no item in the queue (as the jobs were called with an empty "targetgroup"): Thread B should escape the loop
  2. there is currently no item in the queue, as Thread A is working on the item to enqueue: Thread B should wait
  3. there are items in the queue: Thread B should be able to dequeue and process the item
  4. there is no item in the queue, as Thread A has no more items to enqueue: Thread B should escape the loop

现在,我想出了以下实现:

now i came up with following implementation:

public class MightyQueue<T>
  where T : class
{
    private readonly Queue<T> _queue = new Queue<T>();

    private bool? _runable;
    private volatile bool _completed;

    public bool Runable
    {
        get
        {
            while (!this._runable.HasValue)
            {
                Thread.Sleep(100);
            }
            return this._runable ?? false;
        }
        set
        {
            this._runable = value;
        }
    }

    public void Enqueue(T item)
    {
        if (item == null)
        {
            throw new ArgumentNullException("item");
        }

        this._queue.Enqueue(item);
    }

    public void CompleteAdding()
    {
        this._completed = true;
    }

    public bool TryDequeue(out T item)
    {
        if (!this.Runable)
        {
            item = null;
            return false;
        }
        while (this._queue.Count == 0)
        {
            if (this._completed)
            {
                item = null;
                return false;
            }
            Thread.Sleep(100);
        }
        item = this._queue.Dequeue();
        return true;
    }
}

然后将用于

制作

if (anythingToWorkOn)
{
    myFooMightyQueueInstance.Runable = false;
}
else
{
    myFooMightyQueueInstance.Runable = true;
    while (condition)
    {
        myFooMightyQueueInstance.Enqueue(item);
    }
    myFooMightyQueueInstance.CompleteAdding();
}

消费者

if (!myFooMightyQueueInstance.Runable)
{
    return;
}

T item;
while (myFooMightyQueueInstance.TryDequeue(out item))
{
    //work with item
}

但我相信,这种做法是错误的,因为我使用了一些的Thread.Sleep() -stuff在那里(不能有一些的WaitHandle或别的东西吗?)......我不是对算法中自己要么... 任何人都可以请帮我吗?

but i believe, that this approach is wrong, as i'm using some Thread.Sleep()-stuff in there (can't be there some waitHandle or something else?)... i'm not about the algo itself either ... can anyone please help me out?

推荐答案

您应该以一个普通的生产者 - 消费者队列并使用它。在队列内部实现,这不是一个好主意,因为这prevents您使用旗语信号线(或者,你可以有公共信号灯队列中的,但是这是一个真的坏主意)。

You should begin with a generic Producer-Consumer queue and use that. Implementing this inside a Queue is not such a good idea, as this prevents you from using semaphores to signal threads (or, you could have public semaphores in your Queue, but that's a really bad idea).

一旦线程A已经入队一个工作项目,它必须发送一个信号量当线程B已处理完所有的项目,通知线程B.,它应该发送一个信号量,通知其他人,该公司已经完成了。你的主线程应该是在等待这第二个信号要知道,一切都做。

As soon as the thread A has enqueued a single work item, it must signal a semaphore to notify thread B. When thread B has finished processing all items, it should signal a semaphore to notify everyone else that it has finished. Your main thread should be waiting for this second semaphore to know that everything is done.

首先,你有一个生产者和消费者:

First, you have a producer and a consumer:

public interface IProducer<T> : IStoppable
{
    /// <summary>
    /// Notifies clients when a new item is produced.
    /// </summary>
    event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}

public interface IConsumer<T> : IStoppable
{
    /// <summary>
    /// Performs processing of the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    void ConsumeItem(T item);
}

public interface IStoppable
{
    void Stop();
}

那么,你的情况,班级创建邮件需要火 ItemProduced 事件,类发送就需要实施 ConsumeItem

So, in your case, class creating the mail will need to fire the ItemProduced event, and the class sending it will need to implement ConsumeItem.

然后你通过这两个实例工人的一个实例:

And then you pass these two instances to an instance of Worker:

public class Worker<T>
{
    private readonly Object _lock = new Object();
    private readonly Queue<T> _queuedItems = new Queue<T>();
    private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
    private readonly IProducer<T> _producer;
    private readonly IConsumer<T> _consumer;
    private volatile bool _ending = false;
    private Thread _workerThread;

    public Worker(IProducer<T> producer, IConsumer<T> consumer)
    {
        _producer = producer;
        _consumer = consumer;
    }

    public void Start(ThreadPriority priority)
    {
        _producer.ItemProduced += Producer_ItemProduced;
        _ending = false;

        // start a new thread
        _workerThread = new Thread(new ThreadStart(WorkerLoop));
        _workerThread.IsBackground = true;
        _workerThread.Priority = priority;
        _workerThread.Start();
    } 

    public void Stop()
    {
        _producer.ItemProduced -= Producer_ItemProduced;
        _ending = true;

        // signal the consumer, in case it is idle
        _itemReadyEvt.Set();
        _workerThread.Join();
    }

    private void Producer_ItemProduced
         (object sender, ProducedItemEventArgs<T> e)
    {
        lock (_lock) { _queuedItems.Enqueue(e.Item); }

        // notify consumer thread
        _itemReadyEvt.Set();
    }

    private void WorkerLoop()
    {
        while (!_ending)
        {
            _itemReadyEvt.WaitOne(-1, false);

            T singleItem = default(T);
            lock (_lock)
            {
               if (_queuedItems.Count > 0)
               {
                   singleItem = _queuedItems.Dequeue();
               }
            }


            while (singleItem != null)
            {
                try
                {
                    _consumer.ConsumeItem(singleItem);
                }
                catch (Exception ex)
                {
                    // handle exception, fire an event
                    // or something. Otherwise this
                    // worker thread will die and you
                    // will have no idea what happened
                }

                lock (_lock)
                {
                    if (_queuedItems.Count > 0)
                    {
                        singleItem = _queuedItems.Dequeue();
                    }
                }
            }

         }

    } // WorkerLoop

} // Worker

这是一般的想法,可能会有一些额外的调整需要。

That's the general idea, there may be some additional tweaks needed.

要使用它,你需要让你的类实现这两个接口:

To use it, you need to have your classes implement these two interfaces:

IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();

Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();

// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();

// following line will block this (calling) thread
// until all items are consumed
worker.Stop();

关于这个伟大的事情是:

The great thing about this is that:

  • 您可以有你喜欢的尽可能多的工人
  • 在多个工人可以接受来自同一个生产项目
  • 在多个工人可以派遣项目相同的消费者(尽管这意味着你需要采取的情况下,消费者是在一个线程安全的方式实现)

这篇关于我该如何实现自己的先进生产者/消费者的情况?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆