Python多重处理存储数据,直到在每个进程中进一步调用 [英] Python Multiprocessing storing data until further call in each process

查看:72
本文介绍了Python多重处理存储数据,直到在每个进程中进一步调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个不能在进程之间共享的大对象.它具有实例化它和处理其数据的方法.

I have a large object of a type that cannot be shared between processes. It has methods to instantiate it and to work on its data.

当前的操作方式是,我首先在主父流程中实例化该对象,然后在发生某些事件时将其传递给子流程.问题在于,每当子进程运行时,它们每次都会在一段时间内将对象复制到内存中.我想将其存储在仅对他们可用的内存中,这样他们就不必在每次调用该对象的函数时都将其复制.

The current way I'm doing it is I first instantiate the object in the main parent process and then pass it around to subprocesses when some event happens. The problem is that whenever the subprocesses run, they copy the object in memory every time which takes a while. I want to store it in memory that is only available to them so that they don't have to copy it each time they call that object's function.

我将如何存储仅供该进程自己使用的对象?

How would I store an object just for that process's own use?

修改:代码

class MultiQ:
    def __init__(self):
        self.pred = instantiate_predict() #here I instantiate the big object

    def enq_essay(self,essay):
        p = Process(target=self.compute_results, args=(essay,))
        p.start()

    def compute_results(self, essay):
        predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object

这每次都会复制内存中的大对象.我正在努力避免这种情况.

This copies the large object in memory every time. I am trying to avoid that.

可在20个新闻组数据上运行的简短代码示例

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging
import os
import numpy as np
import cPickle as pickle


def get_20newsgroups_fnames():
    all_files = []
    for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")):
        if i>0:
            all_files.extend([os.path.join(root,file) for file in files])
    return all_files

documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()]
logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt = '%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def predict(large_object, essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    X = tfidf_vect.fit_transform(documents)
    y = np.random.random_integers(0,1,19997)
    model = lm.LogisticRegression()
    model.fit(X, y)
    return (tfidf_vect, model)


def report_memory(label):
    f = free_memory()
    logger.warn('{l:<25}: {f}'.format(f=f, l=label))

def dump_large_object(large_object):
    f = open("large_object.obj", "w")
    pickle.dump(large_object, f, protocol=2)
    f.close()

def load_large_object():
    f = open("large_object.obj")
    large_object = pickle.load(f)
    f.close()
    return large_object

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict, args=(large_object,))
             for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    report_memory('After p.start')
    for p in procs:
        p.join()
    report_memory('After p.join')

输出1:

19:01:39: [ MainProcess] Initial                  : 26585728
19:01:51: [ MainProcess] After train_and_model    : 25958924
19:01:51: [ MainProcess] After Process            : 25958924
19:01:51: [ MainProcess] After p.start            : 25925908
19:01:51: [   Process-1] done                     : 25725524
19:01:51: [   Process-2] done                     : 25781076
19:01:51: [   Process-4] done                     : 25789880
19:01:51: [   Process-3] done                     : 25802032
19:01:51: [ MainProcess] After p.join             : 25958272
roman@ubx64:$ du -h large_object.obj
4.6M    large_object.obj

因此,也许大对象甚至都不大,而我的问题出在tfidf矢量化器的transform方法的内存使用上.

So maybe the large object is not even large and my problem was in the memory usage from the transform method of tfidf vectorizer.

现在我是否将主要方法更改为此:

now if I change the main method to this:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
procs = [mp.Process(target=predict, args=(large_object,))
         for i in range(mp.cpu_count())]
report_memory('After Process')
for p in procs:
    p.start()
report_memory('After p.start')
for p in procs:
    p.join()
report_memory('After p.join')

我得到以下结果: 输出2:

20:07:23: [ MainProcess] Initial                  : 26578356
20:07:23: [ MainProcess] After loading the object : 26544380
20:07:23: [ MainProcess] After Process            : 26544380
20:07:23: [ MainProcess] After p.start            : 26523268
20:07:24: [   Process-1] done                     : 26338012
20:07:24: [   Process-4] done                     : 26337268
20:07:24: [   Process-3] done                     : 26439444
20:07:24: [   Process-2] done                     : 26438948
20:07:24: [ MainProcess] After p.join             : 26542860

然后我将主要方法更改为:

Then I changed the main method to this:

report_memory('Initial')
large_object = load_large_object()
report_memory('After loading the object')
predict(large_object)
report_memory('After Process')

并得到以下结果: 输出3:

20:13:34: [ MainProcess] Initial                  : 26572580
20:13:35: [ MainProcess] After loading the object : 26538356
20:13:35: [ MainProcess] done                     : 26513804
20:13:35: [ MainProcess] After Process            : 26513804

目前我还不知道发生了什么,但是多处理肯定会占用更多的内存.

At this point I have no idea what's going on, but the multiprocessing definitely uses more memory.

推荐答案

Linux使用复制- on-write ,这意味着当子流程被分叉时, 每个子进程中的全局变量共享相同的内存地址,直到 值被修改.只有在修改值后,它才会被复制.

Linux uses copy-on-write, which means when a subprocess is forked, the global variables in each subprocess share the same memory address until the value is modified. Only when a value is modified is the it copied.

因此,从理论上讲,如果不修改大对象,则大对象可以使用它. 子进程而不会占用更多内存.让我们测试一下这个理论.

So in theory, if the large object is not modified, it can be used by the subprocesses without consuming more memory. Let's test that theory.

这是您的代码,其中包含一些内存使用记录:

Here is your code, spruced up with a bit of memory usage logging:

import sklearn.feature_extraction.text as ftext
import sklearn.linear_model as lm
import multiprocessing as mp
import logging

logger = mp.get_logger()
formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s',
                              datefmt='%H:%M:%S')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.WARNING)
mp._log_to_stderr = True


def predict(essay="this essay will be predicted"):
    """this method copies the large object in memory which is what im trying to avoid"""
    vectorized_essay = large_object[0].transform(essay)
    large_object[1].predict(vectorized_essay)
    report_memory("done")


def train_and_model():
    """this is very similar to the instantiate_predict method from my first code sample"""
    tfidf_vect = ftext.TfidfVectorizer()
    N = 100000
    corpus = [
        'This is the first document.',
        'This is the second second document.',
        'And the third one.',
        'Is this the first document?', ] * N
    y = [1, 0, 1, 0] * N
    report_memory('Before fit_transform')
    X = tfidf_vect.fit_transform(corpus)
    model = lm.LogisticRegression()
    model.fit(X, y)
    report_memory('After model.fit')
    return (tfidf_vect, model)


def free_memory():
    """
    Return free memory available, including buffer and cached memory
    """
    total = 0
    with open('/proc/meminfo', 'r') as f:
        for line in f:
            line = line.strip()
            if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')):
                field, amount, unit = line.split()
                amount = int(amount)
                if unit != 'kB':
                    raise ValueError(
                        'Unknown unit {u!r} in /proc/meminfo'.format(u=unit))
                total += amount
    return total


def gen_change_in_memory():
    f = free_memory()
    diff = 0
    while True:
        yield diff
        f2 = free_memory()
        diff = f - f2
        f = f2
change_in_memory = gen_change_in_memory().next

def report_memory(label):
    logger.warn('{l:<25}: {d:+d}'.format(d=change_in_memory(), l=label))

if __name__ == '__main__':
    report_memory('Initial')
    tfidf_vect, model = train_and_model()
    report_memory('After train_and_model')
    large_object = (tfidf_vect, model)
    procs = [mp.Process(target=predict) for i in range(mp.cpu_count())]
    report_memory('After Process')
    for p in procs:
        p.start()
    for p in procs:
        p.join()
    report_memory('After p.join')

它产生:

21:45:01: [ MainProcess] Initial                  : +0
21:45:01: [ MainProcess] Before fit_transform     : +3224
21:45:12: [ MainProcess] After model.fit          : +153572
21:45:12: [ MainProcess] After train_and_model    : -3100
21:45:12: [ MainProcess] After Process            : +0
21:45:12: [   Process-1] done                     : +2232
21:45:12: [   Process-2] done                     : +2976
21:45:12: [   Process-3] done                     : +3596
21:45:12: [   Process-4] done                     : +3224
21:45:12: [ MainProcess] After p.join             : -372

报告的数字是可用内存的KiB的变化(包括缓存的和 缓冲区).因此,例如,初始"和初始"之间的可用内存变化 "after train_and_model"大约为150MB.因此,large_object要求大约 150MB.

The number reported is the change in KiB of free memory (including cached and buffers). So, for example, the change in free memory between 'Initial' and 'After train_and_model' was about 150MB. Thus, the large_object requires about 150MB.

然后,在完成4个子过程之后,内存量将大大减少- 总共约12MB-已被消耗.消耗的内存可能是由于 创建子进程以及transformpredict方法.

Then, after 4 subprocesses are are completed, a much smaller amount of memory -- about 12MB total -- has been consumed. The memory consumed could be due to the creation of the subprocess plus memory used by the transform and predict methods.

所以看来large_object没有被复制,因为如果我们 应该会看到消耗的内存增加了约150MB.

So it appears that the large_object is not being copied since if were we should have seen an increase of about 150MB in memory consumed.

关于您在20个新闻组上的运行情况的评论:

以下是可用内存中的更改:

Here are the changes in free memory:

关于20个新闻组数据:

On 20 newsgroups data:

| Initial               |       0 |
| After train_and_model |  626804 | <-- Large object requires 627M
| After Process         |       0 |
| After p.start         |   33016 |
| done                  |  200384 | 
| done                  |  -55552 |
| done                  |   -8804 |
| done                  |  -12152 |
| After p.join          | -156240 |

因此,实例化大对象看起来需要627MB. 我不知道为什么在达到第一个done之后又要消耗200 + MB的内存.

So it looks like instantiating the large object requires 627MB. I am clueless as to why an additional 200+MB have been consumed after the first done is reached.

使用load_large_object:

Using load_large_object:

| Initial                  |       0 |
| After loading the object |   33976 |
| After Process            |       0 |
| After p.start            |   21112 |
| done                     |  185256 |
| done                     |     744 |
| done                     | -102176 |
| done                     |     496 |
| After p.join             | -103912 |

显然,large_object本身仅需要34MB,其余的内存627-34 = 593MB必须已被train_and_model中调用的fit_transformfit方法消耗.

Apparently, the large_object itself requires only 34MB, the rest of the memory, 627-34 = 593MB must have been consumed by the fit_transform and fit methods called in train_and_model.

使用单个过程:

| Initial                  |     0 |
| After loading the object | 34224 |
| done                     | 24552 |
| After Process            |     0 |

这是合理的.

因此,您积累的数据似乎支持这样的说法,即每个子流程都不会复制大对象本身.但是出现了一个新的谜团:为什么在两者之间存在大量的内存消耗 "p.start之后"和第一个完成".我不知道答案.

So, the data you've accumulated seems to support the claim that large object itself is not being copied by each subprocess. But a new mystery arises: Why is there a huge consumption of memory between "After p.start" and the first "done". I don't know the answer to that.

您可以尝试拨打report_memory通话

vectorized_essay = large_object[0].transform(essay)

large_object[1].predict(vectorized_essay)

查看在哪里消耗了额外的内存.我的猜测是,这些scikit-learn方法之一正在选择分配(相对)大量的内存.

to see where the extra memory is being consumed. My guess is that one of these scikit-learn methods is choosing to allocate this (relatively) huge amount of memory.

这篇关于Python多重处理存储数据,直到在每个进程中进一步调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆