生产者使用者在Python中使用信号量和互斥量 [英] Producer Consumer using semaphores and mutexes in Python

查看:148
本文介绍了生产者使用者在Python中使用信号量和互斥量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解如何实现具有有限缓冲区大小的队列,该缓冲区大小可以由使用Python信号量的多个生产者和消费者使用.这是我的实现:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.end_lock = Lock()  # protect end from race across multiple producers
        self.start_lock = Lock()  # protect start from race across multiple consumers
        self.open = Semaphore(size)  # block till there's space to produce
        self.closed = Semaphore(size) # block till there's item to consume
        for _ in range(size):  # initialize with all closed acquired so that consumer is blocked
            self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        with self.end_lock:
            self.buff[self.end] = val
            self.end = (self.end+1)%self.size
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.start_lock:
            val = self.buff[(self.start)%self.size]
            self.start = (self.start+1)%self.size
        self.open.release()
        return val

此实现是否没有错误?是否可以进一步简化以使用更少的互斥锁/信号量?

解决方案

对我来说很好.信号量阻止并发的生产者和使用者过多地读写,而锁则阻止并发的生产者或使用者同时修改endstart索引.

两个信号量绝对是必需的.您可以删除其中一个锁,并在getput中使用它来保护startend索引,这将不允许消费者和生产者同时访问队列. ( CPython的队列实现做到了.)


我将删除size属性,改为使用len(self.buff),并将startend索引分别重命名为read_indexwrite_index(以及锁).另外,我认为您可以在不持有锁的情况下访问缓冲区(因为列表本身是线程安全的):

     def put(self, val):
        self.open.acquire()
        with self.write_lock:
            index = self.write_index
            self.write_index = (self.write_index + 1) % len(self.buff)
        self.buff[index] = val
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.read_lock:
            index = self.read_index
            self.read_index = (self.read_index + 1) % len(self.buff)
        val = self.buff[index]
        self.open.release()
        return val
 


这是我以前玩过的一个小测试程序:

 def producer(queue, start, end, step):
    for value in range(start, end, step):
        queue.put(value)
    print('Producer finished')


def consumer(queue, count, result, lock):
    local_result = []
    for _ in range(count):
        local_result.append(queue.get())
    with lock:
        result.update(local_result)
    print('Consumer finished')


def main():
    value_count = 500000
    producer_count = 50
    consumer_count = 50
    assert value_count % producer_count == 0
    assert value_count % consumer_count == 0

    queue = Queue(123)
    result = set()
    lock = Lock()
    producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
    consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]

    for p in producers:
        p.start()
    for c in consumers:
        c.start()

    for p in producers:
        p.join()
    for c in consumers:
        c.join()

    if len(result) != value_count:
        raise ValueError('Result size is %d instead of %d' % (len(result), value_count))


if __name__ == '__main__':
    main()
 

I'm trying to understand how to implement a Queue with a bounded buffer size that can be used by multiple producers and consumers using Python Semaphores. Here's my implementation:

class Q:

    def __init__(self, size):
        self.buff = [None]*size
        self.end = 0
        self.start = 0
        self.size = size
        self.end_lock = Lock()  # protect end from race across multiple producers
        self.start_lock = Lock()  # protect start from race across multiple consumers
        self.open = Semaphore(size)  # block till there's space to produce
        self.closed = Semaphore(size) # block till there's item to consume
        for _ in range(size):  # initialize with all closed acquired so that consumer is blocked
            self.closed.acquire()

    def put(self, val):
        self.open.acquire()
        with self.end_lock:
            self.buff[self.end] = val
            self.end = (self.end+1)%self.size
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.start_lock:
            val = self.buff[(self.start)%self.size]
            self.start = (self.start+1)%self.size
        self.open.release()
        return val

Is this implementation bug-free? Could this be simplified further to use fewer mutexes/semaphores?

解决方案

Looks good to me. The semaphores prevent concurrent producers and consumers from writing and reading too much and the locks prevent concurrent producers or consumers from modifying the end or start indices simultaneously.

The two semaphores are definitely necessary. You could remove one of the locks and use it in both get and put to protect both the start and the end index which wouldn't allow consumers and producers to access the queue simultaneously. (CPython's queue implementation does this.)


I would remove the size attribute in favor of len(self.buff) though and rename the start and end indices to read_index and write_index respectively (and the locks as well). Also, I think you could access the buffer without holding the locks (because lists themselves are thread-safe):

    def put(self, val):
        self.open.acquire()
        with self.write_lock:
            index = self.write_index
            self.write_index = (self.write_index + 1) % len(self.buff)
        self.buff[index] = val
        self.closed.release()

    def get(self):
        self.closed.acquire()
        with self.read_lock:
            index = self.read_index
            self.read_index = (self.read_index + 1) % len(self.buff)
        val = self.buff[index]
        self.open.release()
        return val


Here's a small test program I used to play around:

def producer(queue, start, end, step):
    for value in range(start, end, step):
        queue.put(value)
    print('Producer finished')


def consumer(queue, count, result, lock):
    local_result = []
    for _ in range(count):
        local_result.append(queue.get())
    with lock:
        result.update(local_result)
    print('Consumer finished')


def main():
    value_count = 500000
    producer_count = 50
    consumer_count = 50
    assert value_count % producer_count == 0
    assert value_count % consumer_count == 0

    queue = Queue(123)
    result = set()
    lock = Lock()
    producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
    consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]

    for p in producers:
        p.start()
    for c in consumers:
        c.start()

    for p in producers:
        p.join()
    for c in consumers:
        c.join()

    if len(result) != value_count:
        raise ValueError('Result size is %d instead of %d' % (len(result), value_count))


if __name__ == '__main__':
    main()

这篇关于生产者使用者在Python中使用信号量和互斥量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆