python多处理进程池无法找到异步函数 [英] python multiprocessing process pool fails to find asynced function

查看:431
本文介绍了python多处理进程池无法找到异步函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

相当简单的多处理示例.目标:

Pretty simple multiprocessing example. Goals:

  1. 使用mp.Pool
  2. 创建流程工作者池
  3. 进行某种转换(这里是line上的简单字符串操作)
  4. 将转换后的行推到mp.Queue
  5. 随后从主程序中的mp.Queue中进一步获取过程数据
  1. Create a pool of process workers using mp.Pool
  2. Do some sort of transformation (here a simple string operation on line)
  3. Push the transformed line to mp.Queue
  4. Further process data from that mp.Queue in the main program afterwards

因此,请执行以下操作:

So lets do this:

import multiprocessing as mp

使用mp.queue初始化异步进程

Init async processes with a mp.queue

def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

真正初始化mp_pool

Really init the mp_pool

no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

每个要异步处理的line都会被调用

This is getting called for every line to be proccesed async

def process_async_main(line):
    print(line)
    q.put(line + '_asynced')

现在让我们使用apply_async

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()

从队列中读取

while not q.empty():
    print(q.get()) # This should be the inital line

失败:

python3 mp_process_example.py
Process ForkPoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker
    task = get()
  File "/usr/lib/python3.6/multiprocessing/queues.py", line 337, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_async_main' on <module '__main__' from 'mp_process_example.py'>

问题是:为什么多处理程序找不到主类?

要复制的完整代码:

import multiprocessing as mp

##### Init async processes
def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

#This is getting called for every line to be proccesed async
def process_async_main(line):
    print(line)
    q.put(line + '_asynced')

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line))
mp_resp = handler.get()

while not q.empty():
    print(q.get()) # This should be the inital line

推荐答案

好...我知道了...由于某些奇怪的原因, multiprocessing无法在其中同步功能与同步代码相同的文件.

Ok... I´ve got it... For some strange reason multiprocessing is not able to have the function to be asynced in the same file as the synchronized code.

编写如下代码:

asynced.py

##### Init async processes
def process_pool_init_per_process(q):
    global mp_queue
    mp_queue = q

##### Function to be asycned
def process_async_main(line):
    print(line)
    mp_queue.put(line + '_asynced')

而不是mp_process_example.py:

import multiprocessing as mp
from asynced import process_async_main, process_pool_init_per_process


# Really init the mp_pool
no_of_processes = 4
q = mp.Queue()
mp_pool = mp.Pool(no_of_processes, process_pool_init_per_process, (q,))

line = "Hi, this is a test to test mp_queues with mp process pools"
handler = mp_pool.apply_async(process_async_main, (line,))
mp_resp = handler.get()

while not q.empty():
    print(q.get()) # This should be the inital line + "_asynced"

按预期工作:

$ python3 mp_process_example.py
Hi, this is a test to test mp_queues with mp process pools
Hi, this is a test to test mp_queues with mp process pools_asynced

这篇关于python多处理进程池无法找到异步函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆