多处理写入 pandas 数据框 [英] Multiprocessing writing to pandas dataframe

查看:62
本文介绍了多处理写入 pandas 数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

因此,我尝试使用以下代码进行操作:读取列表列表,然后将其放入称为checker的函数中,然后让log_result处理函数checker的结果.我尝试使用多线程来完成此操作,因为实际上变量名rows_to_parse具有数百万行,因此使用多核应该可以大大加快此过程.

So what I am trying to do with the following code is to read a list of lists and put them through function called checker and then have log_result deal with the result of the function checker. I am trying to do this using multithreading because the variable name rows_to_parse in reality has millions of rows, so using multiple cores should speed up this process by a considerable amount.

当前代码无法正常工作,并且使Python崩溃.

The code at present moment doesn't work and crashes Python.

我所关注的问题和问题

  1. 想要保存在变量df中的现有df来维持 在整个过程中进行索引,因为否则将获取log_result 对于哪一行需要更新感到困惑.
  2. 我很确定apply_async不适合 多处理功能来执行此任务,因为我相信 计算机读写df的顺序可能会破坏它???
  3. 我认为可能需要设置一个队列来进行读写df 但是我不确定如何去做.
  1. Want the existing df which held in the variable df to maintain the index throughout process because otherwise log_result will get confused as to which row needs updating.
  2. I am quite certain that apply_async is not the appropriate multiprocessing function to perform this duty because I believe the order at which the computer reads and writes the df can possibly corrupt it???
  3. I think that a queue may need to be set up to write and read df but I am unsure as to how I would go about doing that.

谢谢您的帮助.

import pandas as pd
import multiprocessing
from functools import partial

def checker(a,b,c,d,e):
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
    index_of_match = match.index.tolist()
    if len(index_of_match) == 1: #one match in df
        return index_of_match
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
        return [index_of_match[0]]
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
        return [a,b,c,d,e]



def log_result(result, dataf):
    if len(result) == 1: #
        dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df
        new_row = pd.DataFrame([result],columns=cols)
        dataf = dataf.append(new_row,ignore_index=True)


def apply_async_with_callback(parsing_material, dfr):
    pool = multiprocessing.Pool()
    for var_a, var_b, var_c, var_d, var_e in parsing_material:
        pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
    pool.close()
    pool.join()



if __name__ == '__main__':
    #setting up main dataframe
    cols = ['a','b','c','d','e']
    existing_data = [["YES","A","16052011","13031999",3],
                    ["NO","Q","11022003","15081999",3],
                    ["YES","A","22082010","03012001",9]]

    #main dataframe
    df = pd.DataFrame(existing_data,columns=cols)

    #new data
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
                    ['YES', 'W', '17061992', '26032012', 6],
                    ['YES', 'G', '01122006', '07082014', 2],
                    ['YES', 'N', '06081992', '21052008', 9],
                    ['YES', 'Y', '18051995', '24011996', 6],
                    ['NO', 'Q', '11022003', '15081999', 3],
                    ['NO', 'O', '20112004', '28062008', 0],
                    ['YES', 'R', '10071994', '03091996', 8],
                    ['NO', 'C', '09091998', '22051992', 1],
                    ['YES', 'Q', '01051995', '02012000', 3],
                    ['YES', 'Q', '26022015', '26092007', 5],
                    ['NO', 'F', '15072002', '17062001', 8],
                    ['YES', 'I', '24092006', '03112003', 2],
                    ['YES', 'A', '22082010', '03012001', 9],
                    ['YES', 'I', '15072016', '30092005', 7],
                    ['YES', 'Y', '08111999', '02022006', 3],
                    ['NO', 'V', '04012016', '10061996', 1],
                    ['NO', 'I', '21012003', '11022001', 6],
                    ['NO', 'P', '06041992', '30111993', 6],
                    ['NO', 'W', '30081992', '02012016', 6]]


    apply_async_with_callback(rows_to_parse, df)

推荐答案

在MultiProcessing中更新这样的DataFrames无效:

Updating DataFrames like this in MultiProcessing isn't going to work:

dataf = dataf.append(new_row,ignore_index=True)

一方面,这是非常低效的(每个附加项的O(n)总计为O(n ^ 2).首选方式是将某些对象一次性合并在一起).

For one thing this is very inefficient (O(n) for each append so O(n^2) in total. The preferred way is to concat some objects together in one pass.

对于另一个,更重要的是,dataf不会为每次更新锁定,因此无法保证两个操作不会冲突(我猜这会使python崩溃).

For another, and more importantly, dataf is not locking for each update, so there's no guarantee that two operations won't conflict (I'm guessing this is crashing python).

最后,append不起作用,因此一旦回调完成,变量dataf将被丢弃!并且未对父级dataf进行任何更改.

Finally, append doesn't act in place, so the variable dataf is discarded once the callback is finished!! and no changes are made to the parent dataf.

我们可以使用 MultiProcessing列表dict .列出是否不关心顺序或是否关心(例如枚举),因为您必须注意,异步返回的值不是明确定义的.
(或者我们可以创建一个自己实现Lock的对象,请参见 Eli Bendersky .)
因此,进行了以下更改:

We could use MultiProcessing list or a dict. list if you don't care about order or dict if you do (e.g. enumerate), as you must note that the values are returned not in a well-defined order from async.
(or we could create an object which implements Lock ourselves, see Eli Bendersky.)
So the following changes are made:

df = pd.DataFrame(existing_data,columns=cols)
# becomes
df = pd.DataFrame(existing_data,columns=cols)
d = MultiProcessing.list([df])

dataf = dataf.append(new_row,ignore_index=True)
# becomes
d.append(new_row)

现在,异步完成后,您将拥有一个DataFrames的MultiProcessing.list.您可以结合使用这些(和ignore_index)以获得所需的结果:

Now, once the async has finished you have a MultiProcessing.list of DataFrames. You can concat these (and ignore_index) to get the desired result:

pd.concat(d, ignore_index=True)

应该做到这一点.

注意:在每个阶段创建newrow DataFrame的效率也比让熊猫一口气将列表直接解析到DataFrame的效率低.希望这是一个玩具示例,确实希望您的块很大,以便通过MultiProcessing获胜(我听说过50kb是一个经验法则...),一次连续行永远不会在这里赢.

Note: creating the newrow DataFrame at each stage is also less efficient that letting pandas parse the list of lists directly to a DataFrame in one go. Hopefully this is a toy example, really you want your chunks to be quite large to get wins with MultiProcessing (I've heard 50kb as a rule-of-thumb...), a row at a time is never going to be a win here.

此外:您应该避免在代码中使用全局变量(例如df),这样可以更轻松地在函数中传递全局变量(在这种情况下,作为checker的参数).

Aside: You should avoid using globals (like df) in your code, it's much cleaner to pass them around in your functions (in this case, as an argument to checker).

这篇关于多处理写入 pandas 数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆