如果子进程被杀死,多处理池挂起 [英] Multiprocessing Pool hangs if child process killed

查看:48
本文介绍了如果子进程被杀死,多处理池挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我启动了一个工作进程池并提交了一堆任务.系统内存不足,oomkiller 杀死了其中一个工作进程.父进程只是挂在那里等待任务完成,再也没有返回.

I launched a pool of worker processes and submitted a bunch of tasks. The system ran low on memory and the oomkiller killed one of the worker processes. The parent process just hung there waiting for the tasks to finish and never returned.

这是一个重现问题的可运行示例.我没有等待 oomkiller 杀死一个工作进程,而是找到所有工作进程的进程 ID,并告诉第一个任务终止该进程.(对 ps 的调用不适用于所有操作系统.)

Here's a runnable example that reproduces the problem. Instead of waiting for oomkiller to kill one of the worker processes, I find the process ids of all the worker processes and tell the first task to kill that process. (The call to ps won't work on all operating systems.)

import os
import signal
from multiprocessing import Pool
from random import choice
from subprocess import run, PIPE
from time import sleep


def run_task(task):
    target_process_id, n = task
    print(f'Processing item {n} in process {os.getpid()}.')
    delay = n + 1
    sleep(delay)
    if n == 0:
        print(f'Item {n} killing process {target_process_id}.')
        os.kill(target_process_id, signal.SIGKILL)
    else:
        print(f'Item {n} finished.')
    return n, delay


def main():
    print('Starting.')
    pool = Pool()

    ps_output = run(['ps', '-opid', '--no-headers', '--ppid', str(os.getpid())],
                    stdout=PIPE, encoding='utf8')
    child_process_ids = [int(line) for line in ps_output.stdout.splitlines()]
    target_process_id = choice(child_process_ids[1:-1])

    tasks = ((target_process_id, i) for i in range(10))
    for n, delay in pool.imap_unordered(run_task, tasks):
        print(f'Received {delay} from item {n}.')

    print('Closing.')
    pool.close()
    pool.join()
    print('Done.')


if __name__ == '__main__':
    main()

当我在一个有 8 个 CPU 的系统上运行它时,我看到了这个输出:

When I run that on a system with eight CPU's, I see this output:

Starting.
Processing item 0 in process 303.
Processing item 1 in process 304.
Processing item 2 in process 305.
Processing item 3 in process 306.
Processing item 4 in process 307.
Processing item 5 in process 308.
Processing item 6 in process 309.
Processing item 7 in process 310.
Item 0 killing process 308.
Processing item 8 in process 303.
Received 1 from item 0.
Processing item 9 in process 315.
Item 1 finished.
Received 2 from item 1.
Item 2 finished.
Received 3 from item 2.
Item 3 finished.
Received 4 from item 3.
Item 4 finished.
Received 5 from item 4.
Item 6 finished.
Received 7 from item 6.
Item 7 finished.
Received 8 from item 7.
Item 8 finished.
Received 9 from item 8.
Item 9 finished.
Received 10 from item 9.

您可以看到第 5 项永远不会返回,而池只是永远等待.

You can see that item 5 never returns, and the pool just waits forever.

如何让父进程注意到子进程被杀死?

How can I get the parent process to notice when a child process is killed?

推荐答案

Python bug 9205,但他们决定在 concurrent.futures 模块 而不是 multiprocessing 模块.为了利用此修复程序,请切换到较新的进程池.

This problem is described in Python bug 9205, but they decided to fix it in the concurrent.futures module instead of in the multiprocessing module. In order to take advantage of the fix, switch to the newer process pool.

import os
import signal
from concurrent.futures.process import ProcessPoolExecutor
from random import choice
from subprocess import run, PIPE
from time import sleep


def run_task(task):
    target_process_id, n = task
    print(f'Processing item {n} in process {os.getpid()}.')
    delay = n + 1
    sleep(delay)
    if n == 0:
        print(f'Item {n} killing process {target_process_id}.')
        os.kill(target_process_id, signal.SIGKILL)
    else:
        print(f'Item {n} finished.')
    return n, delay


def main():
    print('Starting.')
    pool = ProcessPoolExecutor()

    pool.submit(lambda: None)  # Force the pool to launch some child processes.
    ps_output = run(['ps', '-opid', '--no-headers', '--ppid', str(os.getpid())],
                    stdout=PIPE, encoding='utf8')
    child_process_ids = [int(line) for line in ps_output.stdout.splitlines()]
    target_process_id = choice(child_process_ids[1:-1])

    tasks = ((target_process_id, i) for i in range(10))
    for n, delay in pool.map(run_task, tasks):
        print(f'Received {delay} from item {n}.')

    print('Closing.')
    pool.shutdown()
    print('Done.')


if __name__ == '__main__':
    main()

现在,当您运行它时,您会收到一条明确的错误消息.

Now when you run it, you get a clear error message.

Starting.
Processing item 0 in process 549.
Processing item 1 in process 550.
Processing item 2 in process 552.
Processing item 3 in process 551.
Processing item 4 in process 553.
Processing item 5 in process 554.
Processing item 6 in process 555.
Processing item 7 in process 556.
Item 0 killing process 556.
Processing item 8 in process 549.
Received 1 from item 0.
Traceback (most recent call last):
  File "/home/don/.config/JetBrains/PyCharm2020.1/scratches/scratch2.py", line 42, in <module>
    main()
  File "/home/don/.config/JetBrains/PyCharm2020.1/scratches/scratch2.py", line 33, in main
    for n, delay in pool.map(run_task, tasks):
  File "/usr/lib/python3.7/concurrent/futures/process.py", line 483, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator
    yield fs.pop().result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in result
    return self.__get_result()
  File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

这篇关于如果子进程被杀死,多处理池挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆