如何在多处理期间在共享内存中使用 Pandas DataFrame? [英] How to use pandas DataFrame in shared memory during multiprocessing?

查看:198
本文介绍了如何在多处理期间在共享内存中使用 Pandas DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一个答案:共享只读数据是否复制到不同进程进行多处理?共享内存的有效解决方案给出了一个 numpy 数组.

In one answer to: Is shared readonly data copied to different processes for multiprocessing? a working solution for shared memory for a numpy array is given.

如果使用 Pandas DataFrame 会是什么样子?

How would the same look like if a pandas DataFrame should be used?

背景:我希望能够在多处理期间写入数据帧,并希望能够在多处理完成后进一步处理它.

Background: I would like to be able to write to the DataFrame during multiprocessing and would like to be able to process it further after the multiprocessing has finished.

推荐答案

如果您不想使用 dask,您可以使用共享内存共享一个 Pandas 数据帧,首先将其转换为 numpy 数组,然后在子进程.

If you don't want to use dask, you can share a pandas dataframe using shared memory by first converting it to a numpy array and then reconstructing it in the child processes.

from multiprocessing import shared_memory

def create_shared_block(to_share, dtypes):
    # float64 can't be pickled
    for col, dtype in to_share.dtypes.items():
        if dtype == 'float64':
            to_share[col] = pd.to_numeric(to_share[col], downcast='float')
            
    # make the dataframe a numpy array
    to_share.reset_index(inplace=True)
    
    # drop the index if named index
    to_share = to_share.drop('index', axis=1)
    
    # get the dtypes in the same order as the dataframe columns and make sure the types are correct for numpy
    dtypes_sorted = sort_dtypes(to_share, dtypes)
    
    # get the dataframe values in the format expected by numpy
    values = [tuple(x) for x in to_share.values.tolist()]
    
    # create a numpy array
    to_share = np.array(values, dtype=(dtypes_sorted))
    
    # create a shared memory of the size of the array
    shm = shared_memory.SharedMemory(create=True, size=to_share.nbytes)
    
    # now create a NumPy array backed by shared memory
    np_array = np.ndarray(to_share.shape, dtype=dtypes_sorted, buffer=shm.buf)
    
    # Copy the original data into shared memory
    np_array[:] = to_share[:]
    return shm, np_array, dtypes_sorted


def sort_dtypes(df, dtypes):
    # category is a pandas dtype, not numpy
    string_types = ('category', 'object', '|S')
    dtypes = [(x, '|S{}'.format(df[x].str.len().max())) if y in string_types else (x, y) for x, y in dtypes if
              x in df.columns]
    # build a lookup
    dtypes_dict = {x: y for x, y in dtypes}
    # fix the order
    dtypes_sorted = [(x, dtypes_dict[x]) for x in df.columns]
    return dtypes_sorted

# ------PARENT PROCESS-------#
# create your shared memory
to_share = pd.DataFrame([['obstacle','obstacle',2,3],['obstacles','obstacle',2,np.nan]],columns=['w1','w2','d1','d2'])
dtypes = [('w1','str'),('w2','|S'),('d1','f'),('d2','f')]
shm, arr, dtypes_sorted = create_shared_block(to_share, dtypes)

# then pass these values to your child processes
shared = (shm.name, arr.shape, dtypes_sorted)

# ------CHILD PROCESS-------#
# assuming you have passed to the child process in a variable called shared, you can reconstruct the dataframe as follows
shared_memory = shared_memory.SharedMemory(name=shared[0])
np_array = np.ndarray(shared[1], dtype=shared[2], buffer=shared_memory.buf)
columns = [x for x, y in shared[2]]
df = pd.DataFrame(np_array, columns=columns)

在共享 100k 行数据帧时,这在我的应用程序中节省了一些内存,但可能没有我使用一些已建立的库(如 dask)所能节省的那么多.而且我不太确定重新创建熊猫数据帧所涉及的开销 - 我想它只是引用共享的 numpy 数组并在顶部添加一些额外的东西以使其成为 df.

This has saved some memory in my app when sharing a 100k row dataframe but probably not as much as I could save using some established library like dask. And I'm not too sure of the overhead involved in recreating the pandas dataframe - I'd like to think it just references the shared numpy array and adds some extra stuff on top to make it a df.

这篇关于如何在多处理期间在共享内存中使用 Pandas DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆