Pandas 和多处理内存管理:将数据帧拆分为多个块 [英] Pandas and Multiprocessing Memory Management: Splitting a DataFrame into Multiple Chunks

查看:51
本文介绍了Pandas 和多处理内存管理:将数据帧拆分为多个块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须逐行处理一个巨大的 pandas.DataFrame(几十 GB),其中每行操作都非常冗长(几十毫秒).所以我有了将帧分成多个块并使用 multiprocessing 并行处理每个块的想法.这确实加快了任务速度,但内存消耗是一场噩梦.

I have to process a huge pandas.DataFrame (several tens of GB) on a row by row bases, where each row operation is quite lengthy (a couple of tens of milliseconds). So I had the idea to split up the frame into chunks and process each chunk in parallel using multiprocessing. This does speed-up the task, but the memory consumption is a nightmare.

虽然原则上每个子进程应该只消耗一小块数据,但它需要(几乎)与包含原始 DataFrame 的原始父进程一样多的内存.即使删除父进程中使用的部分也无济于事.

Although each child process should in principle only consume a tiny chunk of the data, it needs (almost) as much memory as the original parent process that contained the original DataFrame. Even deleting the used parts in the parent process does not help.

我写了一个最小的例子来复制这种行为.它唯一能做的就是用随机数创建一个大的 DataFrame,把它分成最多 100 行的小块,然后在多处理过程中简单地打印一些关于 DataFrame 的信息(此处通过大小为 4 的 mp.Pool).

I wrote a minimal example that replicates this behavior. The only thing it does is creating a large DataFrame with random numbers, chunk it into little pieces with at most 100 rows, and simply print some information about the DataFrame during multiprocessing (here via a mp.Pool of size 4).

并行执行的main函数:

The main function that is executed in parallel:

def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))

帮助生成器将 DataFrame 分成小块:

The helper generator to chunk a DataFrame into little pieces:

def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)

和主要例程:

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')

标准输出如下:

Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE

问题:

主进程需要大约 120MB 的内存.然而,池的子进程需要相同数量的内存,尽管它们只包含原始DataFame的1%(大小为100的块vs原始长度为10000).为什么?

The Problem:

The main process needs about 120MB of memory. However, the child processes of the pool need the same amount of memory, although they only contain 1% of the original DataFame (chunks of size 100 vs original length of 10000). Why?

我该怎么办?尽管我分块,Python (3) 是否将整个 DataFrame 发送到每个子进程?这是pandas内存管理的问题还是multiprocessing和数据pickling的问题?谢谢!

What can I do about it? Does Python (3) send the whole DataFrame to each child process despite my chunking? Is that a problem of pandas memory management or the fault of multiprocessing and data pickling? Thanks!

用于简单复制和粘贴的完整脚本,以防您想自己尝试:

Whole script for simple copy and paste in case you want to try it yourself:

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os


def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))


def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)


def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')


if __name__ == '__main__':
    main()

推荐答案

好的,所以我在 Sebastian Opałczyński 在评论中的提示后想通了.

Ok, so I figured it out after the hint by Sebastian Opałczyński in the comments.

问题是子进程是从父进程派生出来的,所以它们都包含对原始DataFrame的引用.然而,帧是在原始进程中操作的,所以copy-on-write 行为会在达到物理内存限制时缓慢并最终杀死整个过程.

The problem is that the child processes are forked from the parent, so all of them contain a reference to the original DataFrame. However, the frame is manipulated in the original process, so the copy-on-write behavior kills the whole thing slowly and eventually when the limit of the physical memory is reached.

有一个简单的解决方案:代替 pool = mp.Pool(n_jobs),我使用了 multiprocessing 的新上下文功能:

There is a simple solution: Instead of pool = mp.Pool(n_jobs), I use the new context feature of multiprocessing:

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

这保证了 Pool 进程只是衍生出来的,而不是从父进程派生出来的.因此,它们都没有访问原始 DataFrame 的权限,而且它们都只需要父级内存的一小部分.

This guarantees that the Pool processes are just spawned and not forked from the parent process. Accordingly, none of them has access to the original DataFrame and all of them only need a tiny fraction of the parent's memory.

请注意,mp.get_context('spawn') 仅在 Python 3.4 及更新版本中可用.

Note that the mp.get_context('spawn') is only available in Python 3.4 and newer.

这篇关于Pandas 和多处理内存管理:将数据帧拆分为多个块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆