生产者使用者在Python中使用信号量和互斥量 [英] Producer Consumer using semaphores and mutexes in Python
问题描述
我试图了解如何实现具有有限缓冲区大小的队列,该缓冲区大小可以由使用Python信号量的多个生产者和消费者使用.这是我的实现:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.end_lock = Lock() # protect end from race across multiple producers
self.start_lock = Lock() # protect start from race across multiple consumers
self.open = Semaphore(size) # block till there's space to produce
self.closed = Semaphore(size) # block till there's item to consume
for _ in range(size): # initialize with all closed acquired so that consumer is blocked
self.closed.acquire()
def put(self, val):
self.open.acquire()
with self.end_lock:
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.closed.release()
def get(self):
self.closed.acquire()
with self.start_lock:
val = self.buff[(self.start)%self.size]
self.start = (self.start+1)%self.size
self.open.release()
return val
此实现是否没有错误?是否可以进一步简化以使用更少的互斥锁/信号量?
对我来说很好.信号量阻止并发的生产者和使用者过多地读写,而锁则阻止并发的生产者或使用者同时修改end
或start
索引.
两个信号量绝对是必需的.您可以删除其中一个锁,并在get
和put
中使用它来保护start
和end
索引,这将不允许消费者和生产者同时访问队列. ( CPython的队列实现做到了.)>
我将删除size
属性,改为使用len(self.buff)
,并将start
和end
索引分别重命名为read_index
和write_index
(以及锁).另外,我认为您可以在不持有锁的情况下访问缓冲区(因为列表本身是线程安全的):
def put(self, val):
self.open.acquire()
with self.write_lock:
index = self.write_index
self.write_index = (self.write_index + 1) % len(self.buff)
self.buff[index] = val
self.closed.release()
def get(self):
self.closed.acquire()
with self.read_lock:
index = self.read_index
self.read_index = (self.read_index + 1) % len(self.buff)
val = self.buff[index]
self.open.release()
return val
这是我以前玩过的一个小测试程序:
def producer(queue, start, end, step):
for value in range(start, end, step):
queue.put(value)
print('Producer finished')
def consumer(queue, count, result, lock):
local_result = []
for _ in range(count):
local_result.append(queue.get())
with lock:
result.update(local_result)
print('Consumer finished')
def main():
value_count = 500000
producer_count = 50
consumer_count = 50
assert value_count % producer_count == 0
assert value_count % consumer_count == 0
queue = Queue(123)
result = set()
lock = Lock()
producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]
for p in producers:
p.start()
for c in consumers:
c.start()
for p in producers:
p.join()
for c in consumers:
c.join()
if len(result) != value_count:
raise ValueError('Result size is %d instead of %d' % (len(result), value_count))
if __name__ == '__main__':
main()
I'm trying to understand how to implement a Queue with a bounded buffer size that can be used by multiple producers and consumers using Python Semaphores. Here's my implementation:
class Q:
def __init__(self, size):
self.buff = [None]*size
self.end = 0
self.start = 0
self.size = size
self.end_lock = Lock() # protect end from race across multiple producers
self.start_lock = Lock() # protect start from race across multiple consumers
self.open = Semaphore(size) # block till there's space to produce
self.closed = Semaphore(size) # block till there's item to consume
for _ in range(size): # initialize with all closed acquired so that consumer is blocked
self.closed.acquire()
def put(self, val):
self.open.acquire()
with self.end_lock:
self.buff[self.end] = val
self.end = (self.end+1)%self.size
self.closed.release()
def get(self):
self.closed.acquire()
with self.start_lock:
val = self.buff[(self.start)%self.size]
self.start = (self.start+1)%self.size
self.open.release()
return val
Is this implementation bug-free? Could this be simplified further to use fewer mutexes/semaphores?
Looks good to me. The semaphores prevent concurrent producers and consumers from writing and reading too much and the locks prevent concurrent producers or consumers from modifying the end
or start
indices simultaneously.
The two semaphores are definitely necessary. You could remove one of the locks and use it in both get
and put
to protect both the start
and the end
index which wouldn't allow consumers and producers to access the queue simultaneously. (CPython's queue implementation does this.)
I would remove the size
attribute in favor of len(self.buff)
though and rename the start
and end
indices to read_index
and write_index
respectively (and the locks as well). Also, I think you could access the buffer without holding the locks (because lists themselves are thread-safe):
def put(self, val):
self.open.acquire()
with self.write_lock:
index = self.write_index
self.write_index = (self.write_index + 1) % len(self.buff)
self.buff[index] = val
self.closed.release()
def get(self):
self.closed.acquire()
with self.read_lock:
index = self.read_index
self.read_index = (self.read_index + 1) % len(self.buff)
val = self.buff[index]
self.open.release()
return val
Here's a small test program I used to play around:
def producer(queue, start, end, step):
for value in range(start, end, step):
queue.put(value)
print('Producer finished')
def consumer(queue, count, result, lock):
local_result = []
for _ in range(count):
local_result.append(queue.get())
with lock:
result.update(local_result)
print('Consumer finished')
def main():
value_count = 500000
producer_count = 50
consumer_count = 50
assert value_count % producer_count == 0
assert value_count % consumer_count == 0
queue = Queue(123)
result = set()
lock = Lock()
producers = [Thread(target=producer, args=(queue, i, value_count, producer_count)) for i in range(producer_count)]
consumers = [Thread(target=consumer, args=(queue, value_count // consumer_count, result, lock)) for _ in range(consumer_count)]
for p in producers:
p.start()
for c in consumers:
c.start()
for p in producers:
p.join()
for c in consumers:
c.join()
if len(result) != value_count:
raise ValueError('Result size is %d instead of %d' % (len(result), value_count))
if __name__ == '__main__':
main()
这篇关于生产者使用者在Python中使用信号量和互斥量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!