如何在多线程中添加使用者 [英] how to add consumers in multithreading

查看:46
本文介绍了如何在多线程中添加使用者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此用于同步的程序用于监视器,并且它具有将读取和写入的生产者,使用者和缓冲区. 如何添加另外2个使用者(因此我在该程序中有3个),该程序仍然可以正常工作?

this program for synchronization used monitors and it has a producer and consumer and buffer that will read and write how can I add another 2 consumers(so I have 3 in this program) and the program still works properly?

  // Synchronized.cs
    // Showing multiple threads modifying a shared object with synchronization.

    using System;
    using System.Threading;

    namespace Synchronized
    {
        // this class synchronizes access to an integer
        public class HoldIntegerSynchronized
        {
            // buffer shared by producer and consumer threads 
            private int buffer = -1;

            // occupiedBufferCount maintains count of occupied buffers
            private int occupiedBufferCount = 0;

            // property Buffer
            public int Buffer
            {
                get
                {
                    // obtain lock on this object
                    Monitor.Enter(this);

                    // if there is no data to read, place invoking 
                    // thread in WaitSleepJoin state
                    if (occupiedBufferCount == 0)
                    {
                        Console.WriteLine(
                           Thread.CurrentThread.Name + " tries to read.");

                        DisplayState("Buffer empty. " +
                           Thread.CurrentThread.Name + " waits.");

                        Monitor.Wait(this);
                    }

                    // indicate that producer can store another value 
                    // because a consumer just retrieved buffer value
                   --occupiedBufferCount;
                  //  occupiedBufferCount -= 1;
                    DisplayState(
                       Thread.CurrentThread.Name + " reads " + buffer);

                    // tell waiting thread (if there is one) to 
                    // become ready to execute (Started state)
                    Monitor.Pulse(this);

                    // Get copy of buffer before releasing lock. 
                    // It is possible that the producer could be
                    // assigned the processor immediately after the
                    // monitor is released and before the return 
                    // statement executes. In this case, the producer 
                    // would assign a new value to buffer before the 
                    // return statement returns the value to the 
                    // consumer. Thus, the consumer would receive the 
                    // new value. Making a copy of buffer and 
                    // returning the copy ensures that the
                    // consumer receives the proper value.
                    int bufferCopy = buffer;

                    // release lock on this object
                    Monitor.Exit(this);

                    return bufferCopy;

                } // end get

                set
                {
                    // acquire lock for this object
                    Monitor.Enter(this);

                    // if there are no empty locations, place invoking
                    // thread in WaitSleepJoin state
                    if (occupiedBufferCount == 1)
                    {
                        Console.WriteLine(
                           Thread.CurrentThread.Name + " tries to write.");

                        DisplayState("Buffer full. " +
                           Thread.CurrentThread.Name + " waits.");

                        Monitor.Wait(this);
                    }

                    // set new sharedInt value
                    buffer = value;

                    // indicate producer cannot store another value 
                    // until consumer retrieves current sharedInt value

                    ++occupiedBufferCount;
                 //   occupiedBufferCount += 1;
                    DisplayState(
                       Thread.CurrentThread.Name + " writes " + buffer);

                    // tell waiting thread (if there is one) to 
                    // become ready to execute (Started state)
                    Monitor.Pulse(this);

                    // release lock on this object
                    Monitor.Exit(this);

                } // end set

            } // end property Buffer

            // display current operation and buffer state
            public void DisplayState(string operation)
            {
                Console.WriteLine("{0,-35}{1,-9}{2}\n",
                   operation, buffer, occupiedBufferCount);
            }

        } // end class HoldIntegerSynchronized

        // class Producer's Produce method controls a thread that
        // stores values from 1 to 4 in sharedLocation
        class Producer
        {
            private HoldIntegerSynchronized sharedLocation;
            private Random randomSleepTime;

            // constructor
            public Producer(
               HoldIntegerSynchronized shared, Random random)
            {
                sharedLocation = shared;
                randomSleepTime = random;
            }

            // store values 1-4 in object sharedLocation
            public void Produce()
            {
                // sleep for random interval upto 3000 milliseconds
                // then set sharedLocation's Buffer property
                for (int count = 1; count <= 10; count++)
                {
                    Thread.Sleep(randomSleepTime.Next(1, 3000));
                    sharedLocation.Buffer = count;
                }

                Console.WriteLine(Thread.CurrentThread.Name +
                   " done producing.\nTerminating " +
                   Thread.CurrentThread.Name + ".\n");

            } // end method Produce

        } // end class Producer

        // class Consumer's Consume method controls a thread that
        // loops four times and reads a value from sharedLocation
        class Consumer
        {
            private HoldIntegerSynchronized sharedLocation;
            private Random randomSleepTime;

            // constructor
            public Consumer(
               HoldIntegerSynchronized shared, Random random)
            {
                sharedLocation = shared;
                randomSleepTime = random;
            }

            // read sharedLocation's value four times
            public void Consume()
            {
                int sum = 0;

                // get current thread
                Thread current = Thread.CurrentThread;

                // sleep for random interval upto 3000 milliseconds
                // then add sharedLocation's Buffer property value
                // to sum
                for (int count = 1; count <= 10; count++)
                {
                    Thread.Sleep(randomSleepTime.Next(1, 3000));
                    sum += sharedLocation.Buffer;
                }

                Console.WriteLine(Thread.CurrentThread.Name +
                   " read values totaling: " + sum +
                   ".\nTerminating " + Thread.CurrentThread.Name + ".\n");

            } // end method Consume

        } // end class Consumer

        // this class creates producer and consumer threads
        class SharedCell
        {
            // create producer and consumer threads and start them
            static void Main(string[] args)
            {
                // create shared object used by threads
                HoldIntegerSynchronized holdInteger =
                   new HoldIntegerSynchronized();

                // Random object used by each thread
                Random random = new Random();

                // create Producer and Consumer objects
                Producer producer =
                   new Producer(holdInteger, random);

                Consumer consumer =
                   new Consumer(holdInteger, random);

                Consumer consumer2=
                   new Consumer(holdInteger, random);

                Consumer consumer3=
                   new Consumer(holdInteger, random);

                // output column heads and initial buffer state
                Console.WriteLine("{0,-35}{1,-9}{2}\n",
                   "Operation", "Buffer", "Occupied Count");
                holdInteger.DisplayState("Initial state");

                // create threads for producer and consumer and set 
                // delegates for each thread
                Thread producerThread =
                   new Thread(new ThreadStart(producer.Produce));
                producerThread.Name = "Producer";

                Thread consumerThread =
                   new Thread(new ThreadStart(consumer.Consume));
                consumerThread.Name = "Consumer";



                // start each thread
                producerThread.Start();
                consumerThread.Start();

            } // end method Main

        } // end class SharedCell
    } // end end!.

推荐答案

在这些情况下,我发现应该编写一个类,该类实现使用condvars的线程安全和阻塞队列(在.NET中,Monitor.Pulse( )和Monitor.PulseAll())等待队列.

I find that it behooves me in these situations to write a class which implements a thread safe and blocking queue that uses condvars (in .NET, Monitor.Pulse() and Monitor.PulseAll()) to wait on a queue.

对于需要添加和删除元素的线程数,此解决方案是安全的.

This solution is safe for as many threads as you want to have adding and removing elements.

一个例子:

using System;
using System.Threading;


namespace MCSharp {


    /** <summary>
        A thread safe, blocking queue.</summary>
        <remarks>
        All members of this class are thread safe.</remarks>
    */
    public class MessageQueue<T> {


        private LinkedQueue<T> messagequeue=new LinkedQueue<T>();
        private Object waitobject=new Object();
        private Int32 numwaitingthreads=0;
        private Object emptyobject=new Object();


        /** <summary>
            Returns the number of items currently waiting in the queue.</summary>
        */
        public Int32 MessageCount {

            get {   lock (waitobject) return messagequeue.Count;    }

        }


        /** <summary>
            Returns the number of threads currently waiting for items to be added to the queue.</summary>
        */
        public Int32 ThreadCount {

            get {   lock (waitobject) return numwaitingthreads; }

        }


        /** <summary>
            Creates a new queue.</summary>
        */
        public MessageQueue () {    }


        /** <summary>
            Adds a new item to the back of the queue.</summary>
            <param name="message">
            The item to add to the queue.</param>
        */
        public void Enqueue (T message) {

            lock (waitobject) {

                messagequeue.Enqueue(message);

                Monitor.Pulse(waitobject);

            }

        }


        /** <summary>
            Removes an item from the front of the queue.</summary>
            <remarks>
            If there is currently no item at the front of the queue the thread will block
            until there is one, and then return with that item.</remarks>
            <returns>
            The item from the front of the queue.</returns>
        */
        public T Dequeue () {

            lock (waitobject) {

                while (messagequeue.Count==0) {

                    numwaitingthreads++;

                    Monitor.Wait(waitobject);

                    numwaitingthreads--;

                }

                lock (emptyobject) {

                    Monitor.PulseAll(emptyobject);

                    return messagequeue.Dequeue();

                }

            }

        }


        /** <summary>
            Waits for the queue to empty.</summary>
            <remarks>
            The calling thread blocks until the thread's <see cref="MCSharp.MessageQueue{T}.MessageCount">
            message count</see> is zero.</remarks>
        */
        public void WaitForEmpty () {

            while (true) {

                Monitor.Enter(waitobject);

                try {

                    if (messagequeue.Count==0) {

                        return;

                    }

                    Monitor.Enter(emptyobject);

                } finally {

                    Monitor.Exit(waitobject);

                }

                try {

                    Monitor.Wait(emptyobject);

                } finally {

                    Monitor.Exit(emptyobject);

                }

            }

        }


    }


}

它指的是"LinkedQueue"类,这里是:

It refers to the "LinkedQueue" class, here that is:

using System;


namespace MCSharp {


    /** <summary>
        Implements a queue based around a singly linked list.</summary>
        <remarks>
        The .NET's built in queue implementation uses a dynamically-resizing array
        for its data storage.</remarks>
    */
    public class LinkedQueue<T> {


        private class SinglyLinkedListNode<NodeT> {


            public SinglyLinkedListNode<NodeT> Next=null;
            public NodeT Item;


            public SinglyLinkedListNode (NodeT item) {

                Item=item;

            }


        }


        private SinglyLinkedListNode<T> head=null;
        private SinglyLinkedListNode<T> tail=null;
        private Int32 count=0;


        /** <summary>
            Returns the number of items in the queue.</summary>
        */
        public Int32 Count {

            get {   return count;   }

        }


        /** <summary>
            Creates a new queue.</summary>
        */
        public LinkedQueue () { }


        /** <summary>
            Adds an item to the rear of the queue.</summary>
            <param name="item">
            The item to add to the queue.</param>
        */
        public void Enqueue (T item) {

            SinglyLinkedListNode<T> newnode=new SinglyLinkedListNode<T>(item);

            count++;

            if (head==null) {

                head=newnode;
                tail=newnode;

            } else {

                tail.Next=newnode;
                tail=newnode;

            }

        }


        /** <summary>
            Returns the item at the front of the queue.</summary>
            <returns>
            The item at the front of the queue.</returns>
        */
        public T Dequeue () {

            if (count==0) throw new InvalidOperationException();

            T returnthis=head.Item;

            if (head.Next==null) tail=null;

            head=head.Next;

            count--;

            return returnthis;

        }


    }


}

这篇关于如何在多线程中添加使用者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆