将功能有效地并行应用到已分组的 pandas DataFrame [英] Efficiently applying a function to a grouped pandas DataFrame in parallel

查看:50
本文介绍了将功能有效地并行应用到已分组的 pandas DataFrame的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我经常需要对一个很大的DataFrame(混合数据类型)的组应用一个函数,并且想利用多个内核.

I often need to apply a function to the groups of a very large DataFrame (of mixed data types) and would like to take advantage of multiple cores.

我可以从组中创建一个迭代器并使用多处理模块,但是这样做效率不高,因为必须对每个组和函数的结果进行腌制,以便在进程之间进行消息传递.

I can create an iterator from the groups and use the multiprocessing module, but it is not efficient because every group and the results of the function must be pickled for messaging between processes.

有什么方法可以避免酸洗,甚至完全避免复制DataFrame?看来多处理模块的共享内存功能仅限于numpy阵列.还有其他选择吗?

Is there any way to avoid the pickling or even avoid the copying of the DataFrame completely? It looks like the shared memory functions of the multiprocessing modules are limited to numpy arrays. Are there any other options?

推荐答案

从上面的评论来看,这似乎是计划在pandas的某个时间进行的(还有一个看起来很有趣的

From the comments above, it seems that this is planned for pandas some time (there's also an interesting-looking rosetta project which I just noticed).

但是,直到将所有并行功能都集成到pandas中之前,我注意到编写高效的&直接使用 cython +

However, until every parallel functionality is incorporated into pandas, I noticed that it's very easy to write efficient & non-memory-copying parallel augmentations to pandas directly using cython + OpenMP and C++.

这是编写并行groupby-sum的简短示例,其用法如下:

Here's a short example of writing a parallel groupby-sum, whose use is something like this:

import pandas as pd
import para_group_demo

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)})
print para_group_demo.sum(df.a, df.b)

输出为:

     sum
key     
0      6
1      11
2      4


注意毫无疑问,这个简单示例的功能最终将成为pandas的一部分.但是,有些事情在一段时间内会更自然地在C ++中进行并行化,并且重要的是要意识到将其组合到pandas中是多么容易.


Note Doubtlessly, this simple example's functionality will eventually be part of pandas. Some things, however, will be more natural to parallelize in C++ for some time, and it's important to be aware of how easy it is to combine this into pandas.

为此,我编写了一个简单的单一源文件扩展名,其代码如下.

To do this, I wrote a simple single-source-file extension whose code follows.

从一些导入和类型定义开始

It starts with some imports and type definitions

from libc.stdint cimport int64_t, uint64_t
from libcpp.vector cimport vector
from libcpp.unordered_map cimport unordered_map

cimport cython
from cython.operator cimport dereference as deref, preincrement as inc
from cython.parallel import prange

import pandas as pd

ctypedef unordered_map[int64_t, uint64_t] counts_t
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t
ctypedef vector[counts_t] counts_vec_t

C ++ unordered_map类型用于单个线程求和,而vector类型用于所有线程求和.

The C++ unordered_map type is for summing by a single thread, and the vector is for summing by all threads.

现在使用功能sum.它以键入的内存视图开始,以便快速访问:

Now to the function sum. It starts off with typed memory views for fast access:

def sum(crit, vals):
    cdef int64_t[:] crit_view = crit.values
    cdef int64_t[:] vals_view = vals.values

该函数通过将半均等地划分为多个线程(在这里硬编码为4),并使每个线程将其范围内的条目相加来继续执行该功能:

The function continues by dividing the semi-equally to the threads (here hardcoded to 4), and having each thread sum the entries in its range:

    cdef uint64_t num_threads = 4
    cdef uint64_t l = len(crit)
    cdef uint64_t s = l / num_threads + 1
    cdef uint64_t i, j, e
    cdef counts_vec_t counts
    counts = counts_vec_t(num_threads)
    counts.resize(num_threads)
    with cython.boundscheck(False):
        for i in prange(num_threads, nogil=True): 
            j = i * s
            e = j + s
            if e > l:
                e = l
            while j < e:
                counts[i][crit_view[j]] += vals_view[j]
                inc(j)

线程完成后,该函数将所有结果(来自不同范围)合并为一个unordered_map:

When the threads have completed, the function merges all the results (from the different ranges) into a single unordered_map:

    cdef counts_t total
    cdef counts_it_t it, e_it
    for i in range(num_threads):
        it = counts[i].begin()
        e_it = counts[i].end()
        while it != e_it:
            total[deref(it).first] += deref(it).second
            inc(it)        

剩下的就是创建一个DataFrame并返回结果:

All that's left is to create a DataFrame and return the results:

    key, sum_ = [], []
    it = total.begin()
    e_it = total.end()
    while it != e_it:
        key.append(deref(it).first)
        sum_.append(deref(it).second)
        inc(it)

    df = pd.DataFrame({'key': key, 'sum': sum_})
    df.set_index('key', inplace=True)
    return df

这篇关于将功能有效地并行应用到已分组的 pandas DataFrame的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆