如何在进程占用过多内存的情况下暂停进程? [英] How to pause processes in case they are consuming too much memory?

查看:94
本文介绍了如何在进程占用过多内存的情况下暂停进程?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

背景:我使用美国地质调查局提供的一组命令行实用程序来处理行星图像.其中有些是RAM占用量,甚至达到了极限(10s of GB). USGS表示,这只是他们的运行方式,没有任何计划来更好地管理RAM.我构建了一个Python包装器来处理文件列表,以调用不同的步骤来处理部分数据(例如,所有图像在一个滤色器中拍摄,所有图像在另一滤色器中拍摄,而所有在另一滤色器中拍摄,等等).由于已对多个列表和多个图像进行了处理,因此我将使用所有可能的CPU将其线程化,以更改可能需要两个月才能运行到一周的内容.目前,我不使用本机Python方法进行线程化;取而代之的是,我使用GNU Parallel(并使用os.system(")来调用并行函数,然后调用该函数),或者使用

Background: I process planetary imagery using a set of command-line utilities provided by the US Geologic Survey. Some of them are RAM hogs, to the extreme (10s of GB). USGS says it's just the way they run and don't have any plans to try to better manage the RAM. I built a Python wrapper to manipulate file lists to call the different steps to process the data in parts (such as all images taken in one color filter, and all taken in another, and all taken in another, etc.). Because things are done to multiple lists and multiple images, I thread it, using all the CPUs I can, to change stuff that might otherwise take two months to run to a week. I don't use native Python methods to thread, at the moment; instead I use GNU Parallel (and use os.system("") to call parallel and then the function) or I use Pysis, which is a Python way to call and multithread the USGS software.

问题:如前所述,对于某些文件,某些步骤会占用大量RAM,并且无法提前知道它们可能是什么.因此,我会遇到这样的情况:对于某些文件,每个进程使用200 MB内存,并且可以在具有8个内核的16GB RAM机器上正常运行,但随后它可能会开始处理其他文件,在这些文件中,RAM占用了数GB内存,在16GB RAM机器上有8个处理器,这意味着RAM被压缩,使用了交换空间...这就是我很幸运,而且该机器不只是锁定的情况.

Problem: As noted, some steps, for some files, take a huge amount of RAM, and there is no way of knowing ahead of time what those might be. So, I can get in a situation where for some files, each process was using 200 MB and runs fine on a 16GB RAM machine with 8 cores, but then it might start to process other files where I get RAM creep, using several GB, which with 8 processors on a 16GB RAM machine means RAM is compressed, swap space is used ... and that's if I'm lucky and the machine doesn't just lock up.

解决方案?我正在寻找一种监视RAM使用情况的方法,例如按进程名称每分钟一次,并且如果我开始看到RAM蠕变(例如,一个进程的8个实例每个使用超过2GB的RAM),我可以暂停除其中之一以外的所有内容,让一个完成,取消暂停另一个,让该完成,依此类推,直到完成这8个,然后继续执行该步骤可能需要执行的其余操作.希望显然,所有这些操作都将在Python中完成,而不是手动完成.

Solution? What I'm looking for is a way to monitor the RAM usage, say once a minute, by process name, and if I start to see RAM creep (e.g., 8 instances of a process each using over 2GB of RAM), I can pause all but one of those, let that one finish, un-pause another, let that finish, etc. until those 8 are done, then continue on with the rest of what might need to run for that step. Hopefully obviously, all this would be done in Python, not manually.

有可能这样做吗?如果可以,怎么办?

Is it possible to do that? If so, how?

推荐答案

您可以使用psutil.Process.suspend()暂停执行超出给定内存阈值的正在运行的进程.监视部分只是反复比较正在运行的进程的psutil.Process().memory_info().rss(驻留集大小")与您给定的阈值.然后,如何安排进一步处理由您决定.

You can use psutil.Process.suspend() to suspend execution of running processes which exceed a given memory threshold. The monitoring part is just repeatedly comparing psutil.Process().memory_info().rss ("Resident Set Size") of running processes with your given threshold. How you then schedule further processing is up to you.

在下面的示例中,我将暂挂罪魁祸首进程,直到其余部分完成为止,然后逐个恢复曾经暂挂的进程.这只是一种简单的方法来显示一般机制.

In the example below I'm suspending culprit-processes until the rest is finished, then resume the once suspended processes one by one. This is meant to be a simplistic approach to show the general mechanism.

import time
import random
from threading import Thread
from multiprocessing import Process, active_children

import psutil


def format_mib(mem_bytes):
    """Format bytes into mebibyte-string."""
    return f'{mem_bytes / 2 ** 20:.2f} MiB'


def f(append_length):
    """Main function in child-process. Appends random floats to list."""
    p = psutil.Process()
    li = []
    for i in range(10):
        li.extend([random.random() for _ in range(append_length)])
        print(f'i: {i} | pid: {p.pid} | '
              f'{format_mib(p.memory_full_info().rss)}')
        time.sleep(2)


def monitored(running_processes, max_mib):
    """Monitor memory usage for running processes.
    Suspend execution for processes surpassing `max_mib` and complete
    one by one after behaving processes have finished.
    """
    running_processes = [psutil.Process(pid=p.pid) for p in running_processes]
    suspended_processes = []

    while running_processes:
        active_children()  # Joins all finished processes.
        #  Without it, p.is_running() below on Unix would not return `False`
        #  for finished processes.
        actual_processes = running_processes.copy()
        for p in actual_processes:
            if not p.is_running():
                running_processes.remove(p)
                print(f'removed finished process: {p}')
            else:
                if p.memory_info().rss / 2 ** 20 > max_mib:
                    print(f'suspending process: {p}')
                    p.suspend()
                    running_processes.remove(p)
                    suspended_processes.append(p)

        time.sleep(1)

    for p in suspended_processes:
        print(f'\nresuming process: {p}')
        p.resume()
        p.wait()


if __name__ == '__main__':

    MAX_MiB = 200

    append_lengths = [100000, 500000, 1000000, 2000000, 300000]
    processes = [Process(target=f, args=(append_length,))
                 for append_length in append_lengths]

    for p in processes:
        p.start()

    m = Thread(target=monitored, args=(processes, MAX_MiB))
    m.start()
    m.join()

示例输出(缩短),其中两个进程因超过200 MiB阈值而被暂停,并在行为过程完成后恢复:

Example output (shortened) with two processes getting suspended for exceeding the 200 MiB threshold and resumed after the behaving processes have finished:

i: 0 | pid: 17997 | 13.53 MiB
i: 0 | pid: 18001 | 19.70 MiB
i: 0 | pid: 17998 | 25.88 MiB
i: 0 | pid: 17999 | 41.32 MiB
i: 0 | pid: 18000 | 72.21 MiB
...
i: 2 | pid: 17997 | 20.84 MiB
i: 2 | pid: 18001 | 42.02 MiB
i: 2 | pid: 17998 | 60.56 MiB
i: 2 | pid: 17999 | 103.36 MiB
i: 2 | pid: 18000 | 215.70 MiB
suspending process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 17997 | 23.93 MiB
i: 3 | pid: 18001 | 47.75 MiB
i: 3 | pid: 17998 | 76.00 MiB
i: 3 | pid: 17999 | 141.59 MiB
...
i: 5 | pid: 17997 | 30.11 MiB
i: 5 | pid: 18001 | 68.24 MiB
i: 5 | pid: 17998 | 107.23 MiB
i: 5 | pid: 17999 | 203.52 MiB
suspending process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17997 | 33.19 MiB
i: 6 | pid: 18001 | 77.49 MiB
i: 6 | pid: 17998 | 122.59 MiB
...
i: 9 | pid: 17997 | 42.47 MiB
i: 9 | pid: 18001 | 105.68 MiB
i: 9 | pid: 17998 | 168.96 MiB
removed finished process: psutil.Process(pid=17997, status='terminated')
removed finished process: psutil.Process(pid=17998, status='terminated')
removed finished process: psutil.Process(pid=18001, status='terminated')

resuming process: psutil.Process(pid=18000, name='python', started='18:20:09')
i: 3 | pid: 18000 | 277.46 MiB
i: 4 | pid: 18000 | 339.22 MiB
i: 5 | pid: 18000 | 400.84 MiB
...
i: 9 | pid: 18000 | 648.00 MiB

resuming process: psutil.Process(pid=17999, name='python', started='18:20:09')
i: 6 | pid: 17999 | 234.55 MiB
...
i: 9 | pid: 17999 | 327.31 MiB


Process finished with exit code 0


我认为,完成此工作后,我唯一剩下的问题是,如何使它一次只生成一定数量的线程[sic!],因为完成工作会添加剩余的线程,然后执行所有已暂停的线程最后呢?

I think my only remaining question from working through this is, how can I get it to spawn only a certain number of threads [sic!] at a time, as stuff completes add remaining ones, and then do all the suspended ones at the end?

我扩展了上面的代码,以允许在旧进程结束时启动新进程,并且将正在运行的进程的最大值设置为内核数.我还将其重构为一个类,因为否则它将开始使所有必需状态的管理变得混乱.为了简洁起见,在下面的代码中,我交替使用名称"tasks"和"processes".请注意已更改的流程启动方法以及代码中附带的注释.

I expanded the code above to enable starting new processes as old ones finish with a maximum for running processes set to the number of cores. I also refactored it into a class since it otherwise would start to get messy with all that necessary state to manage. In the code below I'm using the name "tasks" and "processes" interchangeably for brevity. Note the changed start method for processes and the accompanying comment in the code.

import time
import random
from threading import Thread
from collections import deque
from multiprocessing import Process, active_children, set_start_method

import psutil

# `def format_mib` and `def f` from above unchanged...

class TaskProcessor(Thread):
    """Processor class which monitors memory usage for running
    tasks (processes). Suspends execution for tasks surpassing
    `max_mib` and completes them one by one, after behaving
    tasks have finished.
    """
    def __init__(self, n_cores, max_mib, tasks):
        super().__init__()
        self.n_cores = n_cores
        self.max_mib = max_mib  # memory threshold
        self.tasks = deque(tasks)

        self._running_tasks = []
        self._suspended_tasks = []

    def run(self):
        """Main-function in new thread."""
        self._update_running_tasks()
        self._monitor_running_tasks()
        self._process_suspended_tasks()

    def _update_running_tasks(self):
        """Start new tasks if we have less running tasks than cores."""
        while len(self._running_tasks) < self.n_cores and len(self.tasks) > 0:
            p = self.tasks.popleft()
            p.start()
            # for further process-management we here just need the
            # psutil.Process wrapper
            self._running_tasks.append(psutil.Process(pid=p.pid))
            print(f'Started process: {self._running_tasks[-1]}')

    def _monitor_running_tasks(self):
        """Monitor running tasks. Replace completed tasks and suspend tasks
        which exceed the memory threshold `self.max_mib`.
        """
        # loop while we have running or non-started tasks
        while self._running_tasks or self.tasks:
            active_children()  # Joins all finished processes.
            # Without it, p.is_running() below on Unix would not return
            # `False` for finished processes.
            self._update_running_tasks()
            actual_tasks = self._running_tasks.copy()

            for p in actual_tasks:
                if not p.is_running():  # process has finished
                    self._running_tasks.remove(p)
                    print(f'Removed finished process: {p}')
                else:
                    if p.memory_info().rss / 2 ** 20 > self.max_mib:
                        p.suspend()
                        self._running_tasks.remove(p)
                        self._suspended_tasks.append(p)
                        print(f'Suspended process: {p}')

            time.sleep(1)

    def _process_suspended_tasks(self):
        """Resume processing of suspended tasks."""
        for p in self._suspended_tasks:
            print(f'\nResuming process: {p}')
            p.resume()
            p.wait()


if __name__ == '__main__':

    # Forking (default on Unix-y systems) an already multithreaded process is
    # error-prone. Since we intend to start processes after we are already
    # multithreaded, we switch to another start-method.
    set_start_method('spawn')  # or 'forkserver' (a bit faster start up) if available

    MAX_MiB = 200
    N_CORES = 2

    append_lengths = [100000, 500000, 1000000, 2000000, 300000]
    tasks = [Process(target=f, args=(append_length,))
             for append_length in append_lengths]

    tp = TaskProcessor(n_cores=N_CORES, max_mib=MAX_MiB, tasks=tasks)
    tp.start()
    tp.join()

示例输出(缩短):

Started process: psutil.Process(pid=9422, name='python', started='13:45:53')
Started process: psutil.Process(pid=9423, name='python', started='13:45:53')
i: 0 | pid: 9422 | 18.95 MiB
i: 0 | pid: 9423 | 31.45 MiB
...
i: 9 | pid: 9422 | 47.36 MiB
i: 9 | pid: 9423 | 175.41 MiB
Removed finished process: psutil.Process(pid=9422, status='terminated')
Removed finished process: psutil.Process(pid=9423, status='terminated')
Started process: psutil.Process(pid=9445, name='python', started='13:46:15')
Started process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 0 | pid: 9445 | 46.86 MiB
i: 0 | pid: 9446 | 77.74 MiB
...
i: 2 | pid: 9445 | 117.41 MiB
i: 2 | pid: 9446 | 220.99 MiB
Suspended process: psutil.Process(pid=9446, name='python', started='13:46:15')
Started process: psutil.Process(pid=9450, name='python', started='13:46:21')
i: 0 | pid: 9450 | 25.16 MiB
i: 3 | pid: 9445 | 148.29 MiB
i: 1 | pid: 9450 | 36.47 MiB
i: 4 | pid: 9445 | 179.17 MiB
i: 2 | pid: 9450 | 45.74 MiB
i: 5 | pid: 9445 | 211.14 MiB
Suspended process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 3 | pid: 9450 | 55.00 MiB
...
i: 9 | pid: 9450 | 110.62 MiB
Removed finished process: psutil.Process(pid=9450, status='terminated')

Resuming process: psutil.Process(pid=9446, name='python', started='13:46:15')
i: 3 | pid: 9446 | 282.75 MiB
...
i: 9 | pid: 9446 | 655.35 MiB

Resuming process: psutil.Process(pid=9445, name='python', started='13:46:15')
i: 6 | pid: 9445 | 242.12 MiB
...
i: 9 | pid: 9445 | 334.88 MiB

Process finished with exit code 0

这篇关于如何在进程占用过多内存的情况下暂停进程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆