如何使用带锁的多处理队列 [英] How to use Multiprocessing Queue with Lock
问题描述
发布的代码启动两个异步进程.第一个publisher
Process 将数据发布到Queue
,第二个subscriber
Process 读取Queue
中的数据并将其记录到控制台.
The posted code starts two async Processes. The first publisher
Process publishes data to the Queue
, the second subscriber
Process reads the data from the Queue
and logs it to console.
为了保证Queue不被同时访问,在从Queue
中获取数据之前,subscribe
函数首先执行lock.acquire()
,然后用data = q.get()
获取数据,最后用lock.release()
语句释放锁.
To make sure the Queue is not accessed at the same time, before getting the data from the Queue
, the subscribe
function first executes lock.acquire()
, then gets the data with data = q.get()
and finally releases the lock with the lock.release()
statement.
publish
函数中使用了相同的锁定释放序列.但是自从在 publish
中获取 lock
后,我不得不注释两行函数使脚本停止.为什么?
The same locking-releasing sequence is used in publish
function. But I had to comment two lines since acquiring the lock
in publish
function brings the script to halt. Why?
import multiprocessing, time, uuid, logging
log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)
queue = multiprocessing.Queue()
lock = multiprocessing.Lock()
def publish(q):
for i in range(20):
data = str(uuid.uuid4())
# lock.acquire()
q.put(data)
# lock.release()
log.info('published: %s to queue: %s' % (data, q))
time.sleep(0.2)
def subscribe(q):
while True:
lock.acquire()
data = q.get()
lock.release()
log.info('.......got: %s to queue: %s' % (data, q))
time.sleep(0.1)
publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()
subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()
推荐答案
multiprocessing
队列是线程和进程安全的.
multiprocessing
Queues are thread and process safe.
并且它们支持内部block
ing机制(参见get
/put
方法的签名).
在您的情况下,您不需要 lock
.
And they support internal block
ing mechanism (see the signatures of get
/put
methods).
You don't need a lock
in your case.
import multiprocessing, time, uuid, logging
log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)
queue = multiprocessing.Queue()
def publish(q):
for i in range(20):
data = str(uuid.uuid4())
q.put(data)
log.info('published: %s to queue: %s' % (data, q))
time.sleep(0.2)
q.put(None)
def subscribe(q):
while True:
data = q.get()
if data is None:
log.info('....... end of queue consumption')
break
log.info('.......got: %s to queue: %s' % (data, q))
time.sleep(0.1)
publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()
subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()
publisher.join()
subscriber.join()
示例输出:
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] published: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] .......got: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] process shutting down
[INFO/Process-2] ....... end of queue consumption
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[INFO/Process-2] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
Process finished with exit code 0
这篇关于如何使用带锁的多处理队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!