是否有避免内存深度复制或减少多处理时间的好方法? [英] Is there a good way to avoid memory deep copy or to reduce time spent in multiprocessing?

查看:103
本文介绍了是否有避免内存深度复制或减少多处理时间的好方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Python环境的Pandas模块制作一个基于内存的大数据"实时计算模块.

I am making a memory-based real-time calculation module of "Big data" using Pandas module of the Python environment.

因此,响应时间是该模块的质量,非常关键和重要.

So response time is the quality of this module and very critical and important.

要处理大型数据集,我将拆分数据并并行处理子拆分数据.

To process large data set, I split the data and process sub split data in parallel.

在存储子数据结果的过程中,花费了大量时间(第21行).

In the part of storing the result of sub data, much time spend(21th line).

我认为内部会出现深度复制,或者传递的子数据不会在内存中共享.

I think that internally memory deep copy arises or sub data passed are not shared in memory.

如果我用C或C ++编写模块,则将使用如下所示的指针或引用.

If I written the module in C or C++, I will use pointer or reference like below.

"process = Process(target = addNewDerivedColumn,args = [resultList,& sub_dataframe ])"

"process=Process(target=addNewDerivedColumn, args=[resultList, &sub_dataframe])"

"process = Process(target = addNewDerivedColumn,args = [resultList,sub_dataframe])

"process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe& ):.... "

def addNewDerivedColumn(resultList, split_sub_dataframe&):.... "

是否有避免内存深度复制或减少多处理时间的好方法? 不优雅"是可以的. 我已经准备好使代码变脏了. 我尝试了weekref,RawValue,RawArray,Value,Pool,但是都失败了.

Is there a good way to avoid memory deep copy or to reduce time spent in multiprocessing? "Not elegant" is fine. I am ready for making my codes dirty. I tried weekref, RawValue, RawArray, Value, Pool but all failed.

该模块正在MacOS中开发,最终将在Linux或Unix中运行.

The module is being developed in MacOS and finally is going to run in Linux or Unix.

不要考虑使用Windows操作系统.

Do not consider Windows OS.

代码来了.

真实的代码在我的办公室,但结构和逻辑与真实的代码相同.

The real code is in my office but the structure and logic are the same as the real one.

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)

推荐答案

如果保持与进程之间的进程间通信,您将获得更好的性能. 最低限度.因此,与其传递子DataFrames作为参数,不如传递 索引值.子进程可以切片通用DataFrame本身.

You will get better performance if you keep interprocess communication to a minimum. Therefore, instead of passing sub-DataFrames as arguments, just pass index values. The subprocess can slice the common DataFrame itself.

产生一个子流程时,它会获得在该子流程中定义的所有全局变量的副本. 父进程的调用模块.因此,如果大型DataFrame df为 定义在全局变量之前中,您将生成一个多处理池,然后每个 产生的子进程将有权访问df.

When a subprocess is spawned, it gets a copy of all the globals defined in the calling module of the parent process. Thus, if the large DataFrame, df, is defined in the globals before you spawn a multiprocessing pool, then each spawned subprocess will have access to df.

在没有fork()的Windows上,将启动一个新的python进程,并且 调用模块已导入.因此,在Windows上,产生的子进程必须 从头开始重新生成df,这可能会花费一些时间和更多的内存.

On Windows, where there is no fork(), a new python process is started and the calling module is imported. Thus, on Windows, the spawned subprocess has to regenerate df from scratch, which could take time and much additional memory.

但是,在Linux上,您具有写时复制功能.这意味着产生 子进程无需访问即可访问(调用模块的)原始全局变量 复制它们.仅当子进程尝试修改全局时,Linux才执行 然后在修改值之前进行单独的复制.

On Linux, however, you have copy-on-write. This means that the spawned subprocess accesses the original globals (of the calling module) without copying them. Only when the subprocess tries to modify the global does Linux then make a separate copy before the value is modified.

因此,如果您避免修改自己的全局变量,则可以享受性能提升 子流程.我建议仅将子过程用于计算.返回 计算值,然后让主流程整理结果以进行修改 原始的DataFrame.

So you can enjoy a performance gain if you avoid modifying globals in your subprocesses. I suggest using the subprocess only for computation. Return the value of the computation, and let the main process collate the results to modify the original DataFrame.

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])

if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])

    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')

    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)

    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())

这篇关于是否有避免内存深度复制或减少多处理时间的好方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆