Python多处理:在第一个子错误时中止映射 [英] Python multiprocessing: abort map on first child error

查看:55
本文介绍了Python多处理:在第一个子错误时中止映射的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

其中一个孩子中止和/或引发异常时中止多处理的正确方法是什么?

What's the proper way of aborting multiprocessing when one of the child aborts and/or throw an Exception?

我发现了各种各样的问题(通用多处理错误处理如何在出现异常时关闭多处理池但没有答案,...),但是对于如何在子异常上停止多处理并没有明确的答案.

I found various questions around that (generic multiprocessing error handling, how to close multiprocessing pool on exception but without answer, ...), but no clear answer on how to stop multiprocessing on child exception.

例如,我需要以下代码:

For instance, I expect the following code:

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)


def main():
    with Pool(4) as p:
        try:
            r = p.map(f, range(7))
        except Exception as e:
            print(f"oops: {e}")
            p.close()
            p.terminate()
    print("end")


if __name__ == '__main__':
    main()

要输出:

f(0)
f(1)
f(2)
oops: float division by zero
end

相反,它将在检测/处理异常之前对所有项目应用f函数:

Instead, it applies f function on all items before detecting/handling the exception:

f(0)
f(1)
f(2)
f(4)
f(3)
f(5)
f(6)
oops: float division by zero
end

没有任何方法可以直接捕获异常吗?

Isn't there any way to catch the exception directly?

推荐答案

我认为您需要apply_async,因此您可以对每个结果进行操作,而不是对累积结果进行操作. pool.apply_async提供了一个error_callback参数,可用于注册错误处理程序. apply_async没有阻止,因此您需要join()池.我还使用标志terminated来了解何时可以正常处理结果,以防万一没有异常发生.

I think you're going to need apply_async for this, so you can act upon every single result instead of the cumulative result. pool.apply_async offers an error_callback parameter you can use to register your error-handler. apply_async is not blocking, so you'll need to join() the pool. I'm also using a flag terminated to know when results can be processed normally in case no exception occured.

from time import sleep
from multiprocessing import Pool

def f(x):
    sleep(x)
    print(f"f({x})")
    return 1.0 / (x - 2)

def on_error(e):
    global terminated
    terminated = True
    pool.terminate()
    print(f"oops:{e}")


def main():
    global pool
    global terminated

    terminated = False

    pool = Pool(4)
    results = [pool.apply_async(f, (x,), error_callback=on_error)
               for x in range(7)]
    pool.close()
    pool.join()

    if not terminated:
        for r in results:
            print(r.get())

    print("end")


if __name__ == '__main__':
    main()

这篇关于Python多处理:在第一个子错误时中止映射的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆