如何并行化行式 Pandas 数据框的 apply() 方法 [英] How to parallelize the row wise Pandas dataframe's apply() method

查看:53
本文介绍了如何并行化行式 Pandas 数据框的 apply() 方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码:

import pandas as pd
import time

def enrich_str(str):
        
    val1 = f'{str}_1'
    val2 = f'{str}_2'
    val3 = f'{str}_3'
    time.sleep(3)
    
    return val1, val2, val3
    
def enrich_row(passed_row):
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row


df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

tic = time.perf_counter()
enriched_df = df.apply(enrich_row, col_name='colors', axis=1)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")

enriched_df

获取输出数据帧需要 15 秒,如下所示:

It takes 15 seconds to get the output dataframe like the following one:

现在我想在我的机器上使用多个线程来并行化浓缩操作.我探索了很多解决方案,例如 Dasknumba,但对我来说似乎没有一个是直截了当的.

Now I want to parallelize the enrichment operation using multiple threads on my machine. I explored a lot of solution, like Dask, numba, but none of them seems strightforward to me.

然后我偶然发现了 multiprocessing 库和它的 pool.imaps() 方法.所以我尝试运行以下代码:

Then I stumbled upon the multiprocessing library and its pool.imaps() method. So I tried to run the following code:

import multiprocessing as mp

tic = time.perf_counter()
pool = mp.Pool(5)
result = pool.imap(enrich_row, df.itertuples(), chunksize=1)
pool.close()
pool.join()
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
result

大约需要 2 秒,result 不是 Pandas 数据帧.我不知道我哪里出错了.

It takes about 2 seconds and result isn't a Pandas dataframe. I can't figure out where I'm going wrong.

推荐答案

我建议您使用 pathos fork<multiprocessing 的/a>,因为它可以更好地处理数据帧的酸洗.imap 返回一个迭代器,而不是一个 DataFrame,所以你必须把它转换回来:

I recommend you use the pathos fork of multiprocessing, because it will handle pickling the DataFrames better. imap returns an iterator, not a DataFrame, so you have to convert it back:

def enrich_row(row_tuple):
    passed_row = row_tuple[1]
    col_name = str(passed_row['colName'])
    my_string = str(passed_row[col_name])
    
    val1, val2, val3 = enrich_str(my_string)
    
    passed_row['enriched1'] = val1
    passed_row['enriched2'] = val2
    passed_row['enriched3'] = val3
    
    return passed_row

df = pd.DataFrame({'numbers': [1, 2, 3, 4, 5], 'colors': ['red', 'white', 'blue', 'orange', 'red']}, 
                  columns=['numbers', 'colors'])

df['colName'] = 'colors'

from pathos.multiprocessing import Pool

tic = time.perf_counter()
result = Pool(8).imap(enrich_row, df.iterrows(), chunksize=1)
df = pd.DataFrame(result)
toc = time.perf_counter()

print(f"{df.shape[0]} rows enriched in {toc - tic:0.4f} seconds")
print(df)

请注意,我使用的是 df.iterrows(),它返回一个元组迭代器 (row_number, row),所以我修改了 enrich_row 处理这种格式.

Note that I'm using df.iterrows() which returns an iterator of tuples (row_number, row), so I modified enrich_row to handle this format.

这篇关于如何并行化行式 Pandas 数据框的 apply() 方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆