我该如何实现自己的先进生产者/消费者的情况? [英] How do I implement my own advanced Producer/Consumer scenario?
问题描述
注:的
我做了我的问题一个完整的返工。您可以通过变化的历史看原来的问题。
NOTE:
i did a complete rework of my question. you can see the original question via the change-history.
我在需要的强大的队列中,它提供了以下功能:
i'm in the need of a "mighty" queue, which provides following functionalities:
- 在我有一定的范围内为一组对象。这意味着 A组 B组,......都会有自己的队列
- 即时填充在一组范围的线程队列中的线程A (监制)
- 在我读队列在一组范围的线程线程B (消费者)
- i have a certain scope for a group of objects. that means that Group A, Group B, ... will have their own queue
- i'm filling a queue in a group-scoped thread Thread A (Producer)
- i'm reading a queue in a group-scoped thread Thread B (Consumer)
,所以我将有以下方案:
so i will have following scenarios:
- 有,将在队列中没有的项目(如作业,被称为用一个空的targetgroup):线程B 应该逃避环
- 目前队列中没有的项目,如线程A 正在研究的项目排队:线程B 应等待
- 有队列中的项目:线程B 应该可以出队并处理项目
- 没有在队列中没有的项目,如线程A 已经没有更多的项目来排队:线程B 应该逃避环
- there is and will be no item in the queue (as the jobs were called with an empty "targetgroup"): Thread B should escape the loop
- there is currently no item in the queue, as Thread A is working on the item to enqueue: Thread B should wait
- there are items in the queue: Thread B should be able to dequeue and process the item
- there is no item in the queue, as Thread A has no more items to enqueue: Thread B should escape the loop
现在,我想出了以下实现:
now i came up with following implementation:
public class MightyQueue<T>
where T : class
{
private readonly Queue<T> _queue = new Queue<T>();
private bool? _runable;
private volatile bool _completed;
public bool Runable
{
get
{
while (!this._runable.HasValue)
{
Thread.Sleep(100);
}
return this._runable ?? false;
}
set
{
this._runable = value;
}
}
public void Enqueue(T item)
{
if (item == null)
{
throw new ArgumentNullException("item");
}
this._queue.Enqueue(item);
}
public void CompleteAdding()
{
this._completed = true;
}
public bool TryDequeue(out T item)
{
if (!this.Runable)
{
item = null;
return false;
}
while (this._queue.Count == 0)
{
if (this._completed)
{
item = null;
return false;
}
Thread.Sleep(100);
}
item = this._queue.Dequeue();
return true;
}
}
然后将用于
制作
if (anythingToWorkOn)
{
myFooMightyQueueInstance.Runable = false;
}
else
{
myFooMightyQueueInstance.Runable = true;
while (condition)
{
myFooMightyQueueInstance.Enqueue(item);
}
myFooMightyQueueInstance.CompleteAdding();
}
消费者
if (!myFooMightyQueueInstance.Runable)
{
return;
}
T item;
while (myFooMightyQueueInstance.TryDequeue(out item))
{
//work with item
}
但我相信,这种做法是错误的,因为我使用了一些的Thread.Sleep()
-stuff在那里(不能有一些的WaitHandle或别的东西吗?)......我不是对算法中自己要么...
任何人都可以请帮我吗?
but i believe, that this approach is wrong, as i'm using some Thread.Sleep()
-stuff in there (can't be there some waitHandle or something else?)... i'm not about the algo itself either ...
can anyone please help me out?
推荐答案
您应该以一个普通的生产者 - 消费者队列并使用它。在队列内部实现,这不是一个好主意,因为这prevents您使用旗语信号线(或者,你可以有公共信号灯队列中的,但是这是一个真的坏主意)。
You should begin with a generic Producer-Consumer queue and use that. Implementing this inside a Queue is not such a good idea, as this prevents you from using semaphores to signal threads (or, you could have public semaphores in your Queue, but that's a really bad idea).
一旦线程A已经入队一个工作项目,它必须发送一个信号量当线程B已处理完所有的项目,通知线程B.,它应该发送一个信号量,通知其他人,该公司已经完成了。你的主线程应该是在等待这第二个信号要知道,一切都做。
As soon as the thread A has enqueued a single work item, it must signal a semaphore to notify thread B. When thread B has finished processing all items, it should signal a semaphore to notify everyone else that it has finished. Your main thread should be waiting for this second semaphore to know that everything is done.
首先,你有一个生产者和消费者:
First, you have a producer and a consumer:
public interface IProducer<T> : IStoppable
{
/// <summary>
/// Notifies clients when a new item is produced.
/// </summary>
event EventHandler<ProducedItemEventArgs<T>> ItemProduced;
}
public interface IConsumer<T> : IStoppable
{
/// <summary>
/// Performs processing of the specified item.
/// </summary>
/// <param name="item">The item.</param>
void ConsumeItem(T item);
}
public interface IStoppable
{
void Stop();
}
那么,你的情况,班级创建邮件需要火 ItemProduced
事件,类发送就需要实施 ConsumeItem
。
So, in your case, class creating the mail will need to fire the ItemProduced
event, and the class sending it will need to implement ConsumeItem
.
然后你通过这两个实例工人
的一个实例:
And then you pass these two instances to an instance of Worker
:
public class Worker<T>
{
private readonly Object _lock = new Object();
private readonly Queue<T> _queuedItems = new Queue<T>();
private readonly AutoResetEvent _itemReadyEvt = new AutoResetEvent(false);
private readonly IProducer<T> _producer;
private readonly IConsumer<T> _consumer;
private volatile bool _ending = false;
private Thread _workerThread;
public Worker(IProducer<T> producer, IConsumer<T> consumer)
{
_producer = producer;
_consumer = consumer;
}
public void Start(ThreadPriority priority)
{
_producer.ItemProduced += Producer_ItemProduced;
_ending = false;
// start a new thread
_workerThread = new Thread(new ThreadStart(WorkerLoop));
_workerThread.IsBackground = true;
_workerThread.Priority = priority;
_workerThread.Start();
}
public void Stop()
{
_producer.ItemProduced -= Producer_ItemProduced;
_ending = true;
// signal the consumer, in case it is idle
_itemReadyEvt.Set();
_workerThread.Join();
}
private void Producer_ItemProduced
(object sender, ProducedItemEventArgs<T> e)
{
lock (_lock) { _queuedItems.Enqueue(e.Item); }
// notify consumer thread
_itemReadyEvt.Set();
}
private void WorkerLoop()
{
while (!_ending)
{
_itemReadyEvt.WaitOne(-1, false);
T singleItem = default(T);
lock (_lock)
{
if (_queuedItems.Count > 0)
{
singleItem = _queuedItems.Dequeue();
}
}
while (singleItem != null)
{
try
{
_consumer.ConsumeItem(singleItem);
}
catch (Exception ex)
{
// handle exception, fire an event
// or something. Otherwise this
// worker thread will die and you
// will have no idea what happened
}
lock (_lock)
{
if (_queuedItems.Count > 0)
{
singleItem = _queuedItems.Dequeue();
}
}
}
}
} // WorkerLoop
} // Worker
这是一般的想法,可能会有一些额外的调整需要。
That's the general idea, there may be some additional tweaks needed.
要使用它,你需要让你的类实现这两个接口:
To use it, you need to have your classes implement these two interfaces:
IProducer<IMail> mailCreator = new MailCreator();
IConsumer<IMail> mailSender = new MailSender();
Worker<IMail> worker = new Worker<IMail>(mailCreator, mailSender);
worker.Start();
// produce an item - worker will add it to the
// queue and signal the background thread
mailCreator.CreateSomeMail();
// following line will block this (calling) thread
// until all items are consumed
worker.Stop();
关于这个伟大的事情是:
The great thing about this is that:
- 您可以有你喜欢的尽可能多的工人
- 在多个工人可以接受来自同一个生产项目
- 在多个工人可以派遣项目相同的消费者(尽管这意味着你需要采取的情况下,消费者是在一个线程安全的方式实现)
这篇关于我该如何实现自己的先进生产者/消费者的情况?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!