pandas :优化我的代码(groupby()/apply()) [英] pandas: optimizing my code (groupby() / apply())

查看:145
本文介绍了 pandas :优化我的代码(groupby()/apply())的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个(RxC)1.5M x 128形状的数据框.我执行以下操作:

I have a dataframe of shape (RxC) 1.5M x 128. I do the following:

  1. 我基于6列进行groupby().这样会创建约8700个子组,每个子组的形状为538 x 122.
  2. 在每个子组上,我运行apply().此函数计算子组中每个分类值PER列(即122)的频率百分比.

所以我的(伪)代码:

<df = Read dataframe from file> g = df.groupby(grp_cols) g[nongrp_cols].apply(lambda d: d.apply(lambda s: s.value_counts()) / len(d.index))

<df = Read dataframe from file> g = df.groupby(grp_cols) g[nongrp_cols].apply(lambda d: d.apply(lambda s: s.value_counts()) / len(d.index))

代码对我来说正常工作,所以现在我对其进行性能分析以提高性能. apply()函数大约需要20-25分钟才能运行.我相信问题是它在每个列(122次)上进行了8700次(每个子组)的迭代,这可能不是最好的方法(考虑到我编码的方式).

The code is working OK for me so now I'm profiling it to improve performance. The apply() function takes about 20-25 minutes to run. I believe the problem is it is iterating over every column (122 times) for 8700 times (each subgroup) which may not be the best way (given the way I have coded it).

有人可以建议我尝试加快速度的方法吗?

Can anyone recommend ways I can try to speed this up?

我尝试使用python多处理池(8个进程)将子组划分为相等的组以进行处理,但是最终出现了一些酸洗错误...

I tried using python multiprocessing pool (8 processes) to divide the subgroups into equal sets to process, but ended up getting some pickling error...

谢谢.

推荐答案

pd.DataFrame.groupby.apply确实为我们提供了很大的灵活性(与agg/filter/transform不同,它允许您将每个子组重塑为任何形状,您的情况是从538 x 122到N_categories x 122).但这确实要付出代价:一次灵活地应用您的灵活功能,并且缺乏向量化.

pd.DataFrame.groupby.apply really gives us a lot of flexibility (unlike agg/filter/transform, it allows you to reshape each subgroup to any shape, in your case, from 538 x 122 to N_categories x 122). But it indeed comes with a cost: apply your flexible function one-by-one and lacks of vectorization.

我仍然认为解决问题的方法是使用多重处理.您遇到的泡菜错误很可能是因为您在multi_processing_function内部定义了一些函数.规则是您必须将所有功能移到顶层.请参见下面的代码.

I still think the way to solve it is to use multiprocessing. The pickle error you encounter is most likely because you define some functions inside your multi_processing_function. The rule is that you must move all functions on top levels. See the code below.

import pandas as pd
import numpy as np

# simulate your data with int 0 - 9 for categorical values
df = pd.DataFrame(np.random.choice(np.arange(10), size=(538, 122)))
# simulate your groupby operations, not so cracy with 8700 sub-groups, just try 800 groups for illustration
sim_keys = ['ROW' + str(x) for x in np.arange(800)]
big_data = pd.concat([df] * 800, axis=0, keys=sim_keys)
big_data.shape

big_data.shape
Out[337]: (430400, 122)

# Without multiprocessing
# ===================================================
by_keys = big_data.groupby(level=0)

sample_group = list(by_keys)[0][1]
sample_group.shape

def your_func(g):
    return g.apply(lambda s: s.value_counts()) / len(g.index)

def test_no_multiprocessing(gb, apply_func):
    return gb.apply(apply_func)

%time result_no_multiprocessing = test_no_multiprocessing(by_keys, your_func)

CPU times: user 1min 26s, sys: 4.03 s, total: 1min 30s
Wall time: 1min 27

这里很慢.让我们使用多处理模块:

Pretty slow here. Let's use multiprocessing module:

# multiprocessing for pandas dataframe apply
# ===================================================
# to void pickle error, must define functions at TOP level, if we move this function 'process' into 'test_with_multiprocessing', it raises a pickle error
def process(df):
    return df.groupby(level=0).apply(your_func)

def test_with_multiprocessing(big_data, apply_func):

    import multiprocessing as mp

    p = mp.Pool(processes=8)
    # split it into 8 chunks
    split_dfs = np.array_split(big_data, 8, axis=0)
    # define the mapping function, wrapping it to take just df as input
    # apply to each chunk
    df_pool_results = p.map(process, split_dfs)

    p.close()

    # combine together
    result = pd.concat(df_pool_results, axis=0)

    return result


%time result_with_multiprocessing = test_with_multiprocessing(big_data, your_func)

CPU times: user 984 ms, sys: 3.46 s, total: 4.44 s
Wall time: 22.3 s

现在,它的速度要快得多,尤其是在CPU时间方面.尽管在拆分和重新组合结果时会有一些开销,但是使用8核处理器时,它预计比非多处理情况快约4-6倍.

Now, it's much faster, especially in CPU times. Although a bit overheads are there when we split and recombine the result, it expects to be about 4 - 6 times faster than non-multiprocessing case, when using a 8-core processor.

最后,检查两个结果是否相同.

Finally, check whether two results are the same.

import pandas.util.testing as pdt

pdt.assert_frame_equal(result_no_multiprocessing, result_with_multiprocessing)

顺利通过测试.

这篇关于 pandas :优化我的代码(groupby()/apply())的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆