如何使用带锁的多处理队列 [英] How to use Multiprocessing Queue with Lock

查看:54
本文介绍了如何使用带锁的多处理队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

发布的代码启动两个异步进程.第一个publisher Process 将数据发布到Queue,第二个subscriber Process 读取Queue 中的数据并将其记录到控制台.

The posted code starts two async Processes. The first publisher Process publishes data to the Queue, the second subscriber Process reads the data from the Queue and logs it to console.

为了保证Queue不被同时访问,在从Queue中获取数据之前,subscribe函数首先执行lock.acquire(),然后用data = q.get()获取数据,最后用lock.release()语句释放锁.

To make sure the Queue is not accessed at the same time, before getting the data from the Queue, the subscribe function first executes lock.acquire(), then gets the data with data = q.get() and finally releases the lock with the lock.release() statement.

publish 函数中使用了相同的锁定释放序列.但是自从在 publish 中获取 lock 后,我不得不注释两行函数使脚本停止.为什么?

The same locking-releasing sequence is used in publish function. But I had to comment two lines since acquiring the lock in publish function brings the script to halt. Why?

import multiprocessing, time, uuid, logging

log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)

queue = multiprocessing.Queue()
lock = multiprocessing.Lock()

def publish(q):
    for i in range(20):
        data = str(uuid.uuid4())
        # lock.acquire()
        q.put(data)
        # lock.release()
        log.info('published: %s to queue: %s' % (data, q))
        time.sleep(0.2)

def subscribe(q):
    while True:
        lock.acquire()
        data = q.get()
        lock.release()
        log.info('.......got: %s to queue: %s' % (data, q))
        time.sleep(0.1)

publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()

subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()

推荐答案

multiprocessing 队列是线程和进程安全的.

multiprocessing Queues are thread and process safe.

并且它们支持内部blocking机制(参见get/put方法的签名).
在您的情况下,您不需要 lock.

And they support internal blocking mechanism (see the signatures of get/put methods).
You don't need a lock in your case.

import multiprocessing, time, uuid, logging

log = multiprocessing.log_to_stderr()
log.setLevel(logging.INFO)

queue = multiprocessing.Queue()

def publish(q):
    for i in range(20):
        data = str(uuid.uuid4())
        q.put(data)
        log.info('published: %s to queue: %s' % (data, q))
        time.sleep(0.2)
    q.put(None)

def subscribe(q):
    while True:
        data = q.get()
        if data is None:
            log.info('....... end of queue consumption')
            break
        log.info('.......got: %s to queue: %s' % (data, q))
        time.sleep(0.1)

publisher = multiprocessing.Process(target=publish, args=(queue,))
publisher.start()

subscriber = multiprocessing.Process(target=subscribe, args=(queue,))
subscriber.start()

publisher.join()
subscriber.join()

示例输出:

[INFO/Process-1] child process calling self.run()
[INFO/Process-1] published: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] child process calling self.run()
[INFO/Process-2] .......got: eff77f27-e13e-4d55-9f34-4ea5fc464fc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 264fcf94-9195-4145-b0a1-5ddd787bee1f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 2e040d60-5fd4-45c9-98e6-f0032e13dae8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: afe406ea-20cc-41b3-9cf5-c1dbea11580d to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: e14a6c04-e2fe-4394-a189-5c57c5a98bc8 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: fb90ba87-8090-4ec6-9ac1-85bcaa2bb3f6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 85ab36ee-36f3-4c67-8260-7c41ea82a5d5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: d4dce917-9b5c-470a-9063-bfb0221da55f to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 1e1e2f02-932d-418d-b603-8c90f4699423 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 0b80f1df-c803-4c00-be4d-fad39213829b to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: f6afef2a-42f8-4330-b995-26ee41f833a5 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: abd85275-dc9f-478c-8528-23217db79631 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: c4fad226-8c83-4e52-beae-1cb9a825d370 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: ca16fd7d-ff51-4019-970c-f55c2b3c0db2 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: eca614df-89da-47d0-a8a5-90b56fadb922 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 046903d7-0fd8-4af0-ac49-a22efdc9c029 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 7904d15a-7b04-4968-a52c-cfd8d822b921 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 8543b520-9a7e-4e22-afb3-a4880d910482 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: b4e98f5e-ce63-4f11-a6f7-b7d36020deb0 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] published: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-2] .......got: 4a5eb231-4ccf-41e1-a0d6-ca41a50a6fd6 to queue: <multiprocessing.queues.Queue object at 0x103528780>
[INFO/Process-1] process shutting down
[INFO/Process-2] ....... end of queue consumption
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] process shutting down
[INFO/Process-2] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

Process finished with exit code 0

这篇关于如何使用带锁的多处理队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆