在等待来自多处理的“值"或“数组"时如何处理工作进程块? [英] How do you make a worker process block while waiting for Value or Array from multiprocessing?
问题描述
此文档显示了一个示例使用multiprocessing
库中的Value
和Array
在进程之间共享状态:
This document shows an example to share state between processes using Value
and Array
from multiprocessing
library:
从多处理导入过程,值,数组
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
它将打印
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
我的问题是
-
您如何继续将信息传递给另一个进程,而不是在创建工作进程时传递信息?
How to you continue to pass information to the other process instead of during creation of the worker process?
如何通过这种机制使工作进程阻塞(或挂起)以等待父进程的事件?
How could you make the worker process to block (or suspend) to wait for event from the parent process via this mechanism?
我的平台是Windows10.共享的内存可以在进程之间共享,但是fork()或spawn()进程不能继承信号量,锁,队列等.
My platform is Windows 10. Shared memory could be shared among processes but fork() or spawn() processes could not inherit semaphore, lock, queue, etc.
谢谢.
[更新1]
@Manu-Valdés的演示作品.但是我做了一个例子,没有用,也许您可以帮助发现问题.
The demo given by @Manu-Valdés works. But I did an example does not work, perhaps you could help to spot the problem.
%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os
def f1(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f1({x}) -> {x*x}')
def f2(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f2({x}) -> {x*x}')
def wp_init(q):
#global queue
#queue = q # Point to the global queue in each process
print(f'I am initialized')
def success_cb(result):
print(f'Success returns = {result}')
def failure_cb(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
np = os.cpu_count() # Number of cores per CPU
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))
for x in range(100):
if x % 2 == 0:
f = f1
else:
f = f2
pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)
for x in range(100):
queue.put(x)
# Terminate them but I do not know how to loop through the processes
for _ in range(100):
queue.put(100) # Terminate it
pool.close()
pool.join()
错误是
I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance
推荐答案
To communicate in a thread-safe manner you can use Queue
. The get()
method blocks if the queue is empty, and waits until a new element is put()
:
from multiprocessing import Process, Queue
def f(q):
while True:
element = q.get()
print(element)
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
q.put([42, None, 'hello'])
p.join()
这篇关于在等待来自多处理的“值"或“数组"时如何处理工作进程块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!