在等待来自多处理的“值"或“数组"时如何处理工作进程块? [英] How do you make a worker process block while waiting for Value or Array from multiprocessing?

查看:51
本文介绍了在等待来自多处理的“值"或“数组"时如何处理工作进程块?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

文档显示了一个示例使用multiprocessing库中的ValueArray在进程之间共享状态:

This document shows an example to share state between processes using Value and Array from multiprocessing library:

从多处理导入过程,值,数组

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
    a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

它将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

我的问题是

  1. 您如何继续将信息传递给另一个进程,而不是在创建工作进程时传递信息?

  1. How to you continue to pass information to the other process instead of during creation of the worker process?

如何通过这种机制使工作进程阻塞(或挂起)以等待父进程的事件?

How could you make the worker process to block (or suspend) to wait for event from the parent process via this mechanism?

我的平台是Windows10.共享的内存可以在进程之间共享,但是fork()或spawn()进程不能继承信号量,锁,队列等.

My platform is Windows 10. Shared memory could be shared among processes but fork() or spawn() processes could not inherit semaphore, lock, queue, etc.

谢谢.

[更新1]

@Manu-Valdés的演示作品.但是我做了一个例子,没有用,也许您可​​以帮助发现问题.

The demo given by @Manu-Valdés works. But I did an example does not work, perhaps you could help to spot the problem.

%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os

def f1(q):
  x = q.get(True) # Block until something is in the queue
  if x == 55:
    raise Exception('I do not like 55!')
  elif x == 100:
    return
  else:
    print(f'f1({x}) -> {x*x}')


def f2(q):
  x = q.get(True) # Block until something is in the queue
  if x == 55:
    raise Exception('I do not like 55!')
  elif x == 100:
    return
  else:
    print(f'f2({x}) -> {x*x}')


def wp_init(q):
  #global queue
  #queue = q  # Point to the global queue in each process
  print(f'I am initialized')


def success_cb(result):
  print(f'Success returns = {result}')


def failure_cb(result):
  print(f'Failure returns = {result}')


if __name__ == '__main__':
  np = os.cpu_count()  # Number of cores per CPU
  queue = multiprocessing.Queue()
  pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))

  for x in range(100):
    if x % 2 == 0:
      f = f1
    else:
      f = f2
  pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)

  for x in range(100):
    queue.put(x)

  # Terminate them but I do not know how to loop through the processes
  for _ in range(100):
    queue.put(100)  # Terminate it

  pool.close()
  pool.join()

错误是

I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance

推荐答案

要以线程安全的方式进行通信,可以使用

To communicate in a thread-safe manner you can use Queue. The get() method blocks if the queue is empty, and waits until a new element is put():

from multiprocessing import Process, Queue

def f(q):
    while True:
        element = q.get()
        print(element)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    q.put([42, None, 'hello'])
    p.join()

这篇关于在等待来自多处理的“值"或“数组"时如何处理工作进程块?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆