Pool.apply_async():未执行嵌套函数 [英] Pool.apply_async(): nested function is not executed

查看:1447
本文介绍了Pool.apply_async():未执行嵌套函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经熟悉Python的multiprocessing模块.以下代码按预期工作:

I am getting familiar with Python's multiprocessing module. The following code works as expected:

#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
    print x
    return

pool = Pool(processes=12)
for i in range(4):
    pool.apply_async(run_one, (i,))
pool.close()
pool.join() 

但是,现在,如果我在上面的代码中包装一个函数,则不会执行print语句(或至少将输出重定向):

Now, however, if I wrap a function around the above code, the print statements are not executed (or the output is redirected at least):

#outputs nothing
def run():
    def run_one(x):
        print x
        return    

    pool = Pool(processes=12)
    for i in range(4):    
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join()

如果将run_one定义移到run之外,则在调用run()时输出再次是预期的:

If I move the run_one definition outside of run, the output is the expected one again, when I'm calling run():

#outputs 0 1 2 3
def run_one(x):
    print x
    return

def run():    
    pool = Pool(processes=12)
    for i in range(4):       
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join() 

我在这里想念什么?为什么第二个片段不打印任何内容?如果我只是调用run_one(i)函数而不是使用apply_async,则这三个代码都输出相同.

What am I missing here? Why isn't the second snippet printing anything? If I simply call the run_one(i) function instead of using apply_async, all the three codes output the same.

推荐答案

Pool需要对发送给其工作进程的所有内容进行腌制(序列化).酸洗实际上仅保存函数的名称,而酸洗则需要按名称重新导入该函数. 为此,必须在顶层定义函数,嵌套函数不能被子项导入,并且已经尝试对其进行腌制会引发异常:

Pool needs to pickle (serialize) everything it sends to its worker-processes. Pickling actually only saves the name of a function and unpickling requires re-importing the function by name. For that to work, the function needs to be defined at the top-level, nested functions won't be importable by the child and already trying to pickle them raises an exception:

from multiprocessing.connection import _ForkingPickler

def run():
    def foo(x):
        pass
    _ForkingPickler.dumps(foo)  # multiprocessing custom pickler;
                                # same effect with pickle.dumps(foo)

run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'

之所以看不到异常,是因为Pool在父级的酸洗任务期间已经开始捕获异常,并且仅当您在AsyncResult对象上调用.get()时才重新引发它们当您致电pool.apply_async().

The reason why you don't see an exception is, because Pool already starts catching exceptions during pickling tasks in the parent and only re-raises them when you call .get() on the AsyncResult object you immediately get when you call pool.apply_async().

这就是为什么(对于Python 2)您最好始终像这样使用它,即使您的目标函数不返回任何内容(仍然返回隐式None):

That's why (with Python 2) you better always use it like this, even if your target-function doesn't return anything (still returns implicit None):

    results = [pool.apply_async(foo, (i,)) for i in range(4)]
    # `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
    for res in results:
        res.get()

Pool.map()Pool.starmap()这样的非异步池方法在后台使用相同的(异步)低级函数,例如它们的异步同级,但是它们还会为您调用.get(),因此您将始终看到这些方法是一个例外.

Non-async Pool-methods like Pool.map() and Pool.starmap() use the same (asynchronous) low-level functions under the hood like their asynchronous siblings, but they additionally call .get() for you, so you will always see an exception with these methods.

Python 3具有用于异步Pool-method的error_callback-参数,您可以用来代替它来处理异常.

Python 3 has an error_callback-parameter for asynchronous Pool-methods you can use instead to handle exceptions.

这篇关于Pool.apply_async():未执行嵌套函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆