python在永无止境的进程上运行覆盖 [英] python running coverage on never ending process

查看:92
本文介绍了python在永无止境的进程上运行覆盖的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个经过多进程处理的Web服务器,其进程永无止境,我想检查我在现场环境中整个项目中的代码覆盖率(不仅来自测试).

I have a multi processed web server with processes that never end, I would like to check my code coverage on the whole project in a live environment (not only from tests).

问题是,由于进程永无止境,所以我没有放置cov.start() cov.stop() cov.save()挂钩的好地方.

The problem is, that since the processes never end, I don't have a good place to set the cov.start() cov.stop() cov.save() hooks.

因此,我想到了产生一个无限循环中的线程,该线程将保存并合并coverage数据,然后休眠一段时间,但是此方法不起作用,coverage报告似乎为空,但从睡眠行开始除外

Therefore, I thought about spawning a thread that in an infinite loop will save and combine the coverage data and then sleep some time, however this approach doesn't work, the coverage report seems to be empty, except from the sleep line.

很高兴收到关于如何获得代码覆盖范围的任何想法, 或关于为什么我的想法行不通的任何建议.这是我的代码段:

I would be happy to receive any ideas about how to get the coverage of my code, or any advice about why my idea doesn't work. Here is a snippet of my code:

import coverage
cov = coverage.Coverage()
import time
import threading
import os

class CoverageThread(threading.Thread):
    _kill_now = False
    _sleep_time = 2

@classmethod
def exit_gracefully(cls):
    cls._kill_now = True

def sleep_some_time(self):
    time.sleep(CoverageThread._sleep_time)

def run(self):
    while True:
        cov.start()
        self.sleep_some_time()
        cov.stop()
        if os.path.exists('.coverage'):
            cov.combine()
        cov.save()
        if self._kill_now:
            break
    cov.stop()
    if os.path.exists('.coverage'):
        cov.combine()
    cov.save()
    cov.html_report(directory="coverage_report_data.html")
    print "End of the program. I was killed gracefully :)"

推荐答案

显然,使用多个Threads不能很好地控制coverage. 一旦启动了不同的线程,停止Coverage对象将停止所有覆盖,并且start将仅在启动"线程中重新启动它. 因此,您的代码基本上会在2秒后停止除CoverageThread以外的所有Thread的覆盖范围.

Apparently, it is not possible to control coverage very well with multiple Threads. Once different thread are started, stopping the Coverage object will stop all coverage and start will only restart it in the "starting" Thread. So your code basically stops the coverage after 2 seconds for all Thread other than the CoverageThread.

我在API上玩了一点,可以在不停止Coverage对象的情况下访问测量. 因此,您可以使用API​​启动一个线程,该线程定期保存coverage数据. 第一个实现将是这样的

I played a bit with the API and it is possible to access the measurments without stopping the Coverage object. So you could launch a thread that save the coverage data periodically, using the API. A first implementation would be something like in this

import threading
from time import sleep
from coverage import Coverage
from coverage.data import CoverageData, CoverageDataFiles
from coverage.files import abs_file

cov = Coverage(config_file=True)
cov.start()


def get_data_dict(d):
    """Return a dict like d, but with keys modified by `abs_file` and
    remove the copied elements from d.
    """
    res = {}
    keys = list(d.keys())
    for k in keys:
        a = {}
        lines = list(d[k].keys())
        for l in lines:
            v = d[k].pop(l)
            a[l] = v
        res[abs_file(k)] = a
    return res


class CoverageLoggerThread(threading.Thread):
    _kill_now = False
    _delay = 2

    def __init__(self, main=True):
        self.main = main
        self._data = CoverageData()
        self._fname = cov.config.data_file
        self._suffix = None
        self._data_files = CoverageDataFiles(basename=self._fname,
                                             warn=cov._warn)
        self._pid = os.getpid()
        super(CoverageLoggerThread, self).__init__()

    def shutdown(self):
        self._kill_now = True

    def combine(self):
        aliases = None
        if cov.config.paths:
            from coverage.aliases import PathAliases
            aliases = PathAliases()
            for paths in self.config.paths.values():
                result = paths[0]
                for pattern in paths[1:]:
                    aliases.add(pattern, result)

        self._data_files.combine_parallel_data(self._data, aliases=aliases)

    def export(self, new=True):
        cov_report = cov
        if new:
            cov_report = Coverage(config_file=True)
            cov_report.load()
        self.combine()
        self._data_files.write(self._data)
        cov_report.data.update(self._data)
        cov_report.html_report(directory="coverage_report_data.html")
        cov_report.report(show_missing=True)

    def _collect_and_export(self):
        new_data = get_data_dict(cov.collector.data)
        if cov.collector.branch:
            self._data.add_arcs(new_data)
        else:
            self._data.add_lines(new_data)
        self._data.add_file_tracers(get_data_dict(cov.collector.file_tracers))
        self._data_files.write(self._data, self._suffix)

        if self.main:
            self.export()

    def run(self):
        while True:
            sleep(CoverageLoggerThread._delay)
            if self._kill_now:
                break

            self._collect_and_export()

        cov.stop()

        if not self.main:
            self._collect_and_export()
            return

        self.export(new=False)
        print("End of the program. I was killed gracefully :)")

可以在此 GIST 中找到更稳定的版本. 这段代码基本上不停止收集器收集的信息. get_data_dict函数将Coverage.collector中的词典带出并弹出可用数据.这应该足够安全,这样您就不会丢失任何测量值.
报告文件每_delay秒更新一次.

A more stable version can be found in this GIST. This code basically grab the info collected by the collector without stopping it. The get_data_dict function take the dictionary in the Coverage.collector and pop the available data. This should be safe enough so you don't lose any measurement.
The report files get updated every _delay seconds.

但是,如果有多个进程正在运行,则需要付出额外的努力以确保所有进程都运行CoverageLoggerThread.这是patch_multiprocessing函数,是从coverage猴子补丁中修补的猴子...
该代码位于 GIST 中.它基本上用自定义进程代替了原始进程,该自定义进程在运行run方法之前启动CoverageLoggerThread,并在进程结束时加入线程. 脚本main.py允许使用线程和进程启动不同的测试.

But if you have multiple process running, you need to add extra efforts to make sure all the process run the CoverageLoggerThread. This is the patch_multiprocessing function, monkey patched from the coverage monkey patch...
The code is in the GIST. It basically replaces the original Process with a custom process, which start the CoverageLoggerThread just before running the run method and join the thread at the end of the process. The script main.py permits to launch different tests with threads and processes.

此代码有2/3个缺点,您需要注意:

There is 2/3 drawbacks to this code that you need to be carefull of:

  • 同时使用combine函数是一个坏主意,因为它同时执行对.coverage.*文件的读/写/删除访问.这意味着功能export并非超级安全.没关系,因为数据已多次复制,但是在生产中使用它之前我会做一些测试.

  • It is a bad idea to use the combine function concurrently as it performs comcurrent read/write/delete access to the .coverage.* files. This means that the function export is not super safe. It should be alright as the data is replicated multiple time but I would do some testing before using it in production.

一旦数据被导出,它就会保留在内存中.因此,如果代码库很大,则可能会占用一些资源.可以转储所有数据并重新加载,但是我假设如果您想每2秒记录一次,则您不想每次都重新加载所有数据.如果您延迟几分钟,我每次都会创建一个新的_data,并使用CoverageData.read_file重新加载此过程的承保范围先前的状态.

Once the data have been exported, it stays in memory. So if the code base is huge, it could eat some ressources. It is possible to dump all the data and reload it but I assumed that if you want to log every 2 seconds, you do not want to reload all the data every time. If you go with a delay in minutes, I would create a new _data every time, using CoverageData.read_file to reload previous state of the coverage for this process.

自定义过程将在结束时等待_delay,直到我们加入CoverageThreadLogger为止,因此,如果您有很多快速过程,则希望将睡眠的粒度增加到能够更快地检测到流程结束.它只需要一个在_kill_now上中断的自定义睡眠循环.

The custom process will wait for _delay before finishing as we join the CoverageThreadLogger at the end of the process so if you have a lot of quick processes, you want to increase the granularity of the sleep to be able to detect the end of the Process more quickly. It just need a custom sleep loop that break on _kill_now.

让我知道这是否对您有帮助,或者是否有可能改善这一要点.

Let me know if this help you in some way or if it is possible to improve this gist.

编辑: 看来您无需猴子打补丁多处理模块即可自动启动记录器.在python安装中使用.pth,您可以使用环境变量在新进程上自动启动记录器:

EDIT: It seems you do not need to monkey patch the multiprocessing module to start automatically a logger. Using the .pth in your python install you can use a environment variable to start automatically your logger on new processes:

# Content of coverage.pth in your site-package folder
import os
if "COVERAGE_LOGGER_START" in os.environ:
    import atexit
    from coverage_logger import CoverageLoggerThread
    thread_cov = CoverageLoggerThread(main=False)
    thread_cov.start()
    def close_cov()
        thread_cov.shutdown()
        thread_cov.join()
    atexit.register(close_cov)

然后您可以使用COVERAGE_LOGGER_START=1 python main.y

这篇关于python在永无止境的进程上运行覆盖的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆