在Dask中使用GroupBy的自定义聚合函数构造模式和相应的计数函数 [英] Constructing Mode and Corresponding Count Functions Using Custom Aggregation Functions for GroupBy in Dask

查看:971
本文介绍了在Dask中使用GroupBy的自定义聚合函数构造模式和相应的计数函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以dask现在已经更新以支持groupby的自定义聚合函数。 (感谢开发团队和@chmp的工作!)。我目前正在尝试构造一个模式函数和相应的计数函数。基本上我设想的是,该模式为每个分组返回一个列表,列出特定列的最常见值(即[4,1,2])。此外,还有一个相应的计数函数,用于返回这些值的实例数量,即。 3.



现在我正在尝试在代码中实现它。根据groupby.py文件,自定义聚合的参数如下所示:

 参数
---- ------
名称:str
汇总的名称。它应该是唯一的,因为中间的
结果将通过这个名称来标识。
chunk:callable
将用每个
分区的分组列调用的函数。它可以返回一个系列或一系列的元组。
索引必须等于组。
agg:可调用
将被调用以聚合每个块的结果的函数。
再次将参数(s)分组。如果``chunk``返回一个
元组,``agg``将被全部调用为单独的位置
参数。
finalize:可调用
一个可选的终结器,将用
聚合的结果调用。

以下是所提供的平均值代码:

  custom_mean = dd.Ag​​gregation(
'custom_mean',
lambda s:(s.count(),s.sum()),
lambda count,sum:(count.sum(),sum.sum()),
lambda count,sum:sum / count,

df.groupby('g')。 agg(custom_mean)

我正在想办法做到这一点。目前我有以下功能:

  def custom_count(x):
count = Counter(x)
freq_list = count.values()
max_cnt = MAX(freq_list)
总= freq_list.count(max_cnt)
返回count.most_common(总)

custom_mode = dd.Ag​​gregation(
'custom_mode',
lambda s:custom_count(s),
lambda s1:s1.extend(),
lambda s2:..... 。

然而,我很难理解agg部分应该如何工作。任何有关这个问题的帮助将不胜感激。



谢谢!

解决方案

无可否认,在细节上。感谢您将此问题引起我的注意。请让我现在如果这个答案有帮助,我会提供一个更新版本的文档dask。



对于您的问题:对于单个返回值,聚合的不同步骤等同于:

  res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level = [0 ]))
res = finalize(res)

这些模式函数可以实现如下:

def chunk(s):
# ,假设只有一个分组列,
#实现可以处理多个组列。

#s是一个分组系列。 value_counts创建了多个系列,如
#(group,value):count
return s.value_counts()


def agg(s):
#s是一个分组多索引系列。在.apply完整的子DF将通过
#multi-index和所有。对价值水平进行分组并计算总和。 lambda函数的
#结果是一个序列。因此,
#apply的结果是一个多索引系列,如(group,value):count
return s.apply(lambda s:s.groupby(level = -1).sum ))

#使用熊猫内部版本更快的版本
s = s._selected_obj
return s.groupby(level = list(range(s.index.nlevels)))。sum )


def finalize(s):
#s是form(group,value):count的多指数系列。首先
#在索引的组部分手动分组。 lambda将收到一个带有多索引的
#子系列。接下来,从索引中删除组部分。
#最后,确定具有最大值的索引,即模式。
level = list(range(s.index.nlevels - 1))
return(
s.groupby(level = level)
.apply(lambda s:s.reset_index (level = level,drop = True).argmax())


mode = dd.Ag​​gregation('mode',chunk,agg,finalize)

请注意,这个实现在大小写不匹配数据框 .mode 函数的关系。此版本将返回其中一个值,而不是所有值。

现在可以使用模式聚合,如
$ b

  import pandas as pd 
import dask.dataframe as dd

df = pd.DataFrame({
'col':[0,1,1,2 ,3] * 10,
'g0':[0,0,0,1,1] * 10,
'g1':[0,0,0,1,1] * 10,
})
ddf = dd.from_pandas(df,npartitions = 10)

res = ddf.groupby(['g0','g1'])。agg({ 'col':mode})。compute()
print(res)


So dask has now been updated to support custom aggregation functions for groupby. (Thanks to the dev team and @chmp for working on this!). I am currently trying to construct a mode function and corresponding count function. Basically what I envision is that mode returns a list, for each grouping, of the most common values for a specific column (ie. [4, 1, 2]). Additionally, there is a corresponding count function that returns the number of instances of those values, ie. 3.

Now I am currently trying to implement this in code. As per the groupby.py file, the parameters for custom aggregations are as follows:

Parameters
    ----------
    name : str
        the name of the aggregation. It should be unique, since intermediate
        result will be identified by this name.
    chunk : callable
        a function that will be called with the grouped column of each
        partition. It can either return a single series or a tuple of series.
        The index has to be equal to the groups.
    agg : callable
        a function that will be called to aggregate the results of each chunk.
        Again the argument(s) will be grouped series. If ``chunk`` returned a
        tuple, ``agg`` will be called with all of them as individual positional
        arguments.
    finalize : callable
        an optional finalizer that will be called with the results from the
        aggregation.

Here is the provided code for mean:

    custom_mean = dd.Aggregation(
        'custom_mean',
        lambda s: (s.count(), s.sum()),
        lambda count, sum: (count.sum(), sum.sum()),
        lambda count, sum: sum / count,
    )
    df.groupby('g').agg(custom_mean)

I am trying to think of the best way to do this. Currently I have the following functions:

def custom_count(x):
    count = Counter(x)
    freq_list = count.values()
    max_cnt = max(freq_list)
    total = freq_list.count(max_cnt)
    return count.most_common(total)

custom_mode = dd.Aggregation(
    'custom_mode',
    lambda s: custom_count(s),
    lambda s1: s1.extend(),
    lambda s2: ......
)

However I am getting stuck on understanding how exactly the agg part should be working. Any help on this problem would be appreciated.

Thanks!

解决方案

Admittedly, the docs are currently somewhat light on detail. Thanks for bringing this issue to my attention. Please let me now if this answer helps and I will contribute an updated version of the docs to dask.

To your question: for a single return value, the different steps of the aggregation are equivalent to:

res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level=[0]))
res = finalize(res)

In these terms, the mode function could be implemented as follows:

def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

mode = dd.Aggregation('mode', chunk, agg, finalize)

Note, that this implementation does not match the dataframe .mode function in case of ties. This version will return one of the values in case of a tie, instead of all values.

The mode aggregation can now be used as in

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({
    'col': [0, 1, 1, 2, 3] * 10,
    'g0': [0, 0, 0, 1, 1] * 10,
    'g1': [0, 0, 0, 1, 1] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)

res = ddf.groupby(['g0', 'g1']).agg({'col': mode}).compute()
print(res)

这篇关于在Dask中使用GroupBy的自定义聚合函数构造模式和相应的计数函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆