C# Monitor/Semaphore Concurrency Produce-Consumer for Buffer [英] C# Monitor/Semaphore Concurrency Produce-Consumer for Buffer

查看:62
本文介绍了C# Monitor/Semaphore Concurrency Produce-Consumer for Buffer的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力解决典型的生产者-消费者问题.我有多个生产者和一个消费者.有 n 个生产者线程,每个线程调用 SetOne(OrderObject order),消费者调用 GetOne().Buffer 类用作包含 的缓冲区.n 个单元格,并且每个单元格在 Buffer 类可用时被消耗.出于某种原因,下面的设置有时会起作用,但并不总是消耗所有单元格.我已经包括了所有涉及客户端、服务器和缓冲区的类.此外,我可以展示一个运行这个原型的简单原型.仅供参考 - 所使用的方法是首先将信号量的数量初始化为与正在使用的缓冲区相同的大小,然后在缓冲区打开后找到一个打开的单元格,然后对该单元格执行操作.

I am working on solving a problem with the typical producer-consumer problem. I have multiple producers, and one consumer. There are n producer threads that each call SetOne(OrderObject order) and the consumer calls GetOne(). The Buffer class is used as the buffer which contains < n cells and are each cell are consumed when available by the Buffer class. For some reason, the setup below is working sometimes, but is not always consuming all of the cells. I have included all the classes that are involved Client,Server and Buffer. Additionally, I can show a simple prototype that is running this prototype. FYI - The method being used is to first initialize the amount of semaphores to the same size of the buffer being used, then once a buffer opens, to find an open cell and then perform an operation on that cell.

public class Buffer
{
    public static OrderObject[] BufferCells;
    public static Semaphore _pool { get; set; }
    public static void SetOne(OrderObject order)
    {

        _pool.WaitOne();
        try
        {
            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                BufferCells[i] = order;
                Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        } 
    }

    public static OrderObject GetOne()
    {
        _pool.WaitOne();
        OrderObject value = null;
        try
        {
            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                if (BufferCells[i].Id != "-1")
                {
                    value = BufferCells[i];
                    BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
                    Console.WriteLine(String.Format("        Server Consumed {0}", value.Id));
                    break;
                }
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        }
        return value;
    }
 }


public class Client
{
    public int Id {get;set;}

    public void Run()
    {
         /*Produce*/
         Buffer.SetOne(Id);

    }
}

public class Server
{
    public void Run()
    {
        while(true)
        {
             Buffer.GetOne();
        }
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        /*Initialize 2 Open Semaphores*/
        int numCells = 2;
        Semaphore pool = new Semaphore(numCells, numCells);

        /*Initialize BufferCells with Empty OrderObjects*/
        List<OrderObject> OrderObjects = new List<OrderObject>();
        for (var i = 0; i < numCells; i++)
        {
            OrderObjects.Add(new OrderObject() { Id = "-1" });
        }
        Buffer.BufferCells = OrderObjects.ToArray();

        /*Initialize Consumer Thread*/
        Server server = new Server(pool);
        Thread serverThread = new Thread(new ThreadStart(server.Run));


        /*Initialize Producer Objects*/
        List<Client> clients = new List<Client>();
        for (int i = 0; i <= 20; i++)
        {
            /*Create 5000 Clients*/
            Client client = new Client(i.ToString(), pool, new OrderObject() { Id = i.ToString() });
            clients.Add(client);
        }

        /*Start Each Client Thread*/
        List<Thread> clientThreads = new List<Thread>();
        foreach (var client in clients)
        {
            Thread t = new Thread(new ThreadStart(client.Run));
            clientThreads.Add(t);
        }

        /*Start Server Thread*/
        serverThread.Start();

        /*Start Each Producer Thread*/
        clientThreads.ForEach(p => p.Start());

        /*Start Consumer Thread*/
        Console.ReadLine();
    }
}

我猜我遇到了以下问题之一:死锁、活锁或饥饿.出于某种原因,服务器 无法使用生成并添加到单元缓冲区的所有订单对象.不确定修复是什么.

I am guessing that I am running into one of the following problems: Deadlock,Livelock or starvation. For some reason Server is failing to consume all of the order object(s) produced and added to the cell buffer. Not sure what the fix is.

推荐答案

好的,让我们分解一下这段代码想要做什么..

Ok, let's decompose what this code is trying to do..

public class Buffer
{
    public static OrderObject[] BufferCells;
    public static Semaphore _pool { get; set; }

//设置一个

    public static void SetOne(OrderObject order)
    {

//为什么这里需要信号量?

// Why do you need a semaphore here?

        _pool.WaitOne();
        try
        {

//信号量,是多余的,因为 Monitor.Enter 是一个限制性更强的锁.

// Semaphore, is redundant since the Monitor.Enter is a more restrictive lock.

            Monitor.Enter(BufferCells);

//嗯?我以为这应该是 SetOne?不是 SetEverything?我只能假设您的意图是设置其中一个单元格,而让其余单元格可用于设置或获取.如果这是您要实现的目标,则队列似乎是更合适的数据结构.更好的是 BlockingCollection 也有锁定/阻止机制启用.

// Huh? I thought this was supposed to be SetOne? Not SetEverything? I can only assume that your intention was to set one of the cells while let the rest be available for setting or getting. A Queue seems like a more appropriate data structure if this is what you are trying to achieve. Better yet a BlockingCollection that also has locking/blocking mechanics enabled.

            for (int i = 0; i < BufferCells.Length - 1; i++)
            {

                BufferCells[i] = order;
                Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        } 
    }

    public static OrderObject GetOne()
    {

//同样,这个信号量在这里似乎不是很有用

// Again this semaphore does not seem to be very helpful here

        _pool.WaitOne();
        OrderObject value = null;
        try
        {

//因为监视器又是一个限制性更强的锁

// Because again the monitor is a more restrictive lock

            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                if (BufferCells[i].Id != "-1")
                {
                    value = BufferCells[i];
                    BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
                    Console.WriteLine(String.Format("        Server Consumed {0}", value.Id));
                    break;
                }
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        }
        return value;
    }
 }

总结:

  • 这段代码使用了冗余锁,这是不必要的
  • 这段代码一次性将缓冲区中的所有单元格设置在一个排他锁下,这似乎首先违背了缓冲区的目的
  • 似乎此代码试图实现的目标已经在 BlockingCollection.无需重新发明轮子!:)

祝你好运!

这篇关于C# Monitor/Semaphore Concurrency Produce-Consumer for Buffer的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆