如何在多线程中添加使用者 [英] how to add consumers in multithreading
问题描述
此用于同步的程序用于监视器,并且它具有将读取和写入的生产者,使用者和缓冲区. 如何添加另外2个使用者(因此我在该程序中有3个),该程序仍然可以正常工作?
this program for synchronization used monitors and it has a producer and consumer and buffer that will read and write how can I add another 2 consumers(so I have 3 in this program) and the program still works properly?
// Synchronized.cs
// Showing multiple threads modifying a shared object with synchronization.
using System;
using System.Threading;
namespace Synchronized
{
// this class synchronizes access to an integer
public class HoldIntegerSynchronized
{
// buffer shared by producer and consumer threads
private int buffer = -1;
// occupiedBufferCount maintains count of occupied buffers
private int occupiedBufferCount = 0;
// property Buffer
public int Buffer
{
get
{
// obtain lock on this object
Monitor.Enter(this);
// if there is no data to read, place invoking
// thread in WaitSleepJoin state
if (occupiedBufferCount == 0)
{
Console.WriteLine(
Thread.CurrentThread.Name + " tries to read.");
DisplayState("Buffer empty. " +
Thread.CurrentThread.Name + " waits.");
Monitor.Wait(this);
}
// indicate that producer can store another value
// because a consumer just retrieved buffer value
--occupiedBufferCount;
// occupiedBufferCount -= 1;
DisplayState(
Thread.CurrentThread.Name + " reads " + buffer);
// tell waiting thread (if there is one) to
// become ready to execute (Started state)
Monitor.Pulse(this);
// Get copy of buffer before releasing lock.
// It is possible that the producer could be
// assigned the processor immediately after the
// monitor is released and before the return
// statement executes. In this case, the producer
// would assign a new value to buffer before the
// return statement returns the value to the
// consumer. Thus, the consumer would receive the
// new value. Making a copy of buffer and
// returning the copy ensures that the
// consumer receives the proper value.
int bufferCopy = buffer;
// release lock on this object
Monitor.Exit(this);
return bufferCopy;
} // end get
set
{
// acquire lock for this object
Monitor.Enter(this);
// if there are no empty locations, place invoking
// thread in WaitSleepJoin state
if (occupiedBufferCount == 1)
{
Console.WriteLine(
Thread.CurrentThread.Name + " tries to write.");
DisplayState("Buffer full. " +
Thread.CurrentThread.Name + " waits.");
Monitor.Wait(this);
}
// set new sharedInt value
buffer = value;
// indicate producer cannot store another value
// until consumer retrieves current sharedInt value
++occupiedBufferCount;
// occupiedBufferCount += 1;
DisplayState(
Thread.CurrentThread.Name + " writes " + buffer);
// tell waiting thread (if there is one) to
// become ready to execute (Started state)
Monitor.Pulse(this);
// release lock on this object
Monitor.Exit(this);
} // end set
} // end property Buffer
// display current operation and buffer state
public void DisplayState(string operation)
{
Console.WriteLine("{0,-35}{1,-9}{2}\n",
operation, buffer, occupiedBufferCount);
}
} // end class HoldIntegerSynchronized
// class Producer's Produce method controls a thread that
// stores values from 1 to 4 in sharedLocation
class Producer
{
private HoldIntegerSynchronized sharedLocation;
private Random randomSleepTime;
// constructor
public Producer(
HoldIntegerSynchronized shared, Random random)
{
sharedLocation = shared;
randomSleepTime = random;
}
// store values 1-4 in object sharedLocation
public void Produce()
{
// sleep for random interval upto 3000 milliseconds
// then set sharedLocation's Buffer property
for (int count = 1; count <= 10; count++)
{
Thread.Sleep(randomSleepTime.Next(1, 3000));
sharedLocation.Buffer = count;
}
Console.WriteLine(Thread.CurrentThread.Name +
" done producing.\nTerminating " +
Thread.CurrentThread.Name + ".\n");
} // end method Produce
} // end class Producer
// class Consumer's Consume method controls a thread that
// loops four times and reads a value from sharedLocation
class Consumer
{
private HoldIntegerSynchronized sharedLocation;
private Random randomSleepTime;
// constructor
public Consumer(
HoldIntegerSynchronized shared, Random random)
{
sharedLocation = shared;
randomSleepTime = random;
}
// read sharedLocation's value four times
public void Consume()
{
int sum = 0;
// get current thread
Thread current = Thread.CurrentThread;
// sleep for random interval upto 3000 milliseconds
// then add sharedLocation's Buffer property value
// to sum
for (int count = 1; count <= 10; count++)
{
Thread.Sleep(randomSleepTime.Next(1, 3000));
sum += sharedLocation.Buffer;
}
Console.WriteLine(Thread.CurrentThread.Name +
" read values totaling: " + sum +
".\nTerminating " + Thread.CurrentThread.Name + ".\n");
} // end method Consume
} // end class Consumer
// this class creates producer and consumer threads
class SharedCell
{
// create producer and consumer threads and start them
static void Main(string[] args)
{
// create shared object used by threads
HoldIntegerSynchronized holdInteger =
new HoldIntegerSynchronized();
// Random object used by each thread
Random random = new Random();
// create Producer and Consumer objects
Producer producer =
new Producer(holdInteger, random);
Consumer consumer =
new Consumer(holdInteger, random);
Consumer consumer2=
new Consumer(holdInteger, random);
Consumer consumer3=
new Consumer(holdInteger, random);
// output column heads and initial buffer state
Console.WriteLine("{0,-35}{1,-9}{2}\n",
"Operation", "Buffer", "Occupied Count");
holdInteger.DisplayState("Initial state");
// create threads for producer and consumer and set
// delegates for each thread
Thread producerThread =
new Thread(new ThreadStart(producer.Produce));
producerThread.Name = "Producer";
Thread consumerThread =
new Thread(new ThreadStart(consumer.Consume));
consumerThread.Name = "Consumer";
// start each thread
producerThread.Start();
consumerThread.Start();
} // end method Main
} // end class SharedCell
} // end end!.
推荐答案
在这些情况下,我发现应该编写一个类,该类实现使用condvars的线程安全和阻塞队列(在.NET中,Monitor.Pulse( )和Monitor.PulseAll())等待队列.
I find that it behooves me in these situations to write a class which implements a thread safe and blocking queue that uses condvars (in .NET, Monitor.Pulse() and Monitor.PulseAll()) to wait on a queue.
对于需要添加和删除元素的线程数,此解决方案是安全的.
This solution is safe for as many threads as you want to have adding and removing elements.
一个例子:
using System;
using System.Threading;
namespace MCSharp {
/** <summary>
A thread safe, blocking queue.</summary>
<remarks>
All members of this class are thread safe.</remarks>
*/
public class MessageQueue<T> {
private LinkedQueue<T> messagequeue=new LinkedQueue<T>();
private Object waitobject=new Object();
private Int32 numwaitingthreads=0;
private Object emptyobject=new Object();
/** <summary>
Returns the number of items currently waiting in the queue.</summary>
*/
public Int32 MessageCount {
get { lock (waitobject) return messagequeue.Count; }
}
/** <summary>
Returns the number of threads currently waiting for items to be added to the queue.</summary>
*/
public Int32 ThreadCount {
get { lock (waitobject) return numwaitingthreads; }
}
/** <summary>
Creates a new queue.</summary>
*/
public MessageQueue () { }
/** <summary>
Adds a new item to the back of the queue.</summary>
<param name="message">
The item to add to the queue.</param>
*/
public void Enqueue (T message) {
lock (waitobject) {
messagequeue.Enqueue(message);
Monitor.Pulse(waitobject);
}
}
/** <summary>
Removes an item from the front of the queue.</summary>
<remarks>
If there is currently no item at the front of the queue the thread will block
until there is one, and then return with that item.</remarks>
<returns>
The item from the front of the queue.</returns>
*/
public T Dequeue () {
lock (waitobject) {
while (messagequeue.Count==0) {
numwaitingthreads++;
Monitor.Wait(waitobject);
numwaitingthreads--;
}
lock (emptyobject) {
Monitor.PulseAll(emptyobject);
return messagequeue.Dequeue();
}
}
}
/** <summary>
Waits for the queue to empty.</summary>
<remarks>
The calling thread blocks until the thread's <see cref="MCSharp.MessageQueue{T}.MessageCount">
message count</see> is zero.</remarks>
*/
public void WaitForEmpty () {
while (true) {
Monitor.Enter(waitobject);
try {
if (messagequeue.Count==0) {
return;
}
Monitor.Enter(emptyobject);
} finally {
Monitor.Exit(waitobject);
}
try {
Monitor.Wait(emptyobject);
} finally {
Monitor.Exit(emptyobject);
}
}
}
}
}
它指的是"LinkedQueue"类,这里是:
It refers to the "LinkedQueue" class, here that is:
using System;
namespace MCSharp {
/** <summary>
Implements a queue based around a singly linked list.</summary>
<remarks>
The .NET's built in queue implementation uses a dynamically-resizing array
for its data storage.</remarks>
*/
public class LinkedQueue<T> {
private class SinglyLinkedListNode<NodeT> {
public SinglyLinkedListNode<NodeT> Next=null;
public NodeT Item;
public SinglyLinkedListNode (NodeT item) {
Item=item;
}
}
private SinglyLinkedListNode<T> head=null;
private SinglyLinkedListNode<T> tail=null;
private Int32 count=0;
/** <summary>
Returns the number of items in the queue.</summary>
*/
public Int32 Count {
get { return count; }
}
/** <summary>
Creates a new queue.</summary>
*/
public LinkedQueue () { }
/** <summary>
Adds an item to the rear of the queue.</summary>
<param name="item">
The item to add to the queue.</param>
*/
public void Enqueue (T item) {
SinglyLinkedListNode<T> newnode=new SinglyLinkedListNode<T>(item);
count++;
if (head==null) {
head=newnode;
tail=newnode;
} else {
tail.Next=newnode;
tail=newnode;
}
}
/** <summary>
Returns the item at the front of the queue.</summary>
<returns>
The item at the front of the queue.</returns>
*/
public T Dequeue () {
if (count==0) throw new InvalidOperationException();
T returnthis=head.Item;
if (head.Next==null) tail=null;
head=head.Next;
count--;
return returnthis;
}
}
}
这篇关于如何在多线程中添加使用者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!