在不耗尽RAM的情况下使用并发期货 [英] Using Concurrent Futures without running out of RAM

查看:73
本文介绍了在不耗尽RAM的情况下使用并发期货的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在做一些文件解析,这是CPU绑定的任务.无论我在进程中投入多少文件,它都不会使用超过50MB的RAM. 该任务是可并行解析的,我将其设置为使用下面的并发Future来将每个文件解析为一个单独的过程:

I'm doing some file parsing that is a CPU bound task. No matter how many files I throw at the process it uses no more than about 50MB of RAM. The task is parrallelisable, and I've set it up to use concurrent futures below to parse each file as a separate process:

    from concurrent import futures
    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        # A dictionary which will contain a list the future info in the key, and the filename in the value
        jobs = {}

        # Loop through the files, and run the parse function for each file, sending the file-name to it.
        # The results of can come back in any order.
        for this_file in files_list:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)

问题是,当我使用期货运行此程序时,RAM使用率猛增,不久之后我用完了,Python崩溃了.这可能在很大程度上是因为parse_function的结果大小为几MB.一旦结果通过post_processing,应用程序将不再需要它们.如您所见,我正在尝试del jobs[job]清除jobs中的项目,但这没有什么区别,内存使用率保持不变,并且似乎以相同的速度增加.

The problem is that when I run this using futures, RAM usage rockets and before long I've run out and Python has crashed. This is probably in large part because the results from parse_function are several MB in size. Once the results have been through post_processing, the application has no further need of them. As you can see, I'm trying del jobs[job] to clear items out of jobs, but this has made no difference, memory usage remains unchanged, and seems to increase at the same rate.

我也已经确认这不是因为它只通过一个进程并加上time.sleep(1)来等待post_process函数.

I've also confirmed it's not because it's waiting on the post_process function by only using a single process, plus throwing in a time.sleep(1).

在期货文档中没有关于内存管理的任何内容,虽然简短的搜索表明它在实际的期货应用中已经出现过(

There's nothing in the futures docs about memory management, and while a brief search indicates it has come up before in real-world applications of futures (Clear memory in python loop and http://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures) - the answers don't translate to my use-case (they're all concerned with timeouts and the like).

那么,如何在不耗尽RAM的情况下使用并发期货呢? (Python 3.5)

So, how do you use Concurrent futures without running out of RAM? (Python 3.5)

推荐答案

我会开枪(可能是个错误的猜测...)

I'll take a shot (Might be a wrong guess...)

您可能需要一点一点地提交工作,因为在每次提交时,您都在复制parser_variables,这可能最终会破坏您的RAM.

You might need to submit your work bit by bit since on each submit you're making a copy of parser_variables which may end up chewing your RAM.

这是有趣的部分上带有< ----"的工作代码

Here is working code with "<----" on the interesting parts

with futures.ProcessPoolExecutor(max_workers=6) as executor:
    # A dictionary which will contain a list the future info in the key, and the filename in the value
    jobs = {}

    # Loop through the files, and run the parse function for each file, sending the file-name to it.
    # The results of can come back in any order.
    files_left = len(files_list) #<----
    files_iter = iter(files_list) #<------

    while files_left:
        for this_file in files_iter:
            job = executor.submit(parse_function, this_file, **parser_variables)
            jobs[job] = this_file
            if len(jobs) > MAX_JOBS_IN_QUEUE:
                break #limit the job submission for now job

        # Get the completed jobs whenever they are done
        for job in futures.as_completed(jobs):

            files_left -= 1 #one down - many to go...   <---

            # Send the result of the file the job is based on (jobs[job]) and the job (job.result)
            results_list = job.result()
            this_file = jobs[job]

            # delete the result from the dict as we don't need to store it.
            del jobs[job]

            # post-processing (putting the results into a database)
            post_process(this_file, results_list)
            break; #give a chance to add more jobs <-----

这篇关于在不耗尽RAM的情况下使用并发期货的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆