Dask DataFrame:对具有多行的groupby对象进行重采样 [英] Dask DataFrame: Resample over groupby object with multiple rows

查看:183
本文介绍了Dask DataFrame:对具有多行的groupby对象进行重采样的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从Castra创建了以下dask数据框:

I have the following dask dataframe created from Castra:

import dask.dataframe as dd

df = dd.from_castra('data.castra', columns=['user_id','ts','text'])

屈服:

                      user_id / ts                  / text
ts
2015-08-08 01:10:00   9235      2015-08-08 01:10:00   a
2015-08-08 02:20:00   2353      2015-08-08 02:20:00   b
2015-08-08 02:20:00   9235      2015-08-08 02:20:00   c
2015-08-08 04:10:00   9235      2015-08-08 04:10:00   d
2015-08-08 08:10:00   2353      2015-08-08 08:10:00   e

我想做的是:

  1. user_idts
  2. 分组
  3. 在3个小时内对其重新采样
  4. 在重采样步骤中,所有合并的行都应将文本连接起来
  1. Group by user_id and ts
  2. Resample it over a 3-hour period
  3. In the resampling step, any merged rows should concatenate the texts

示例输出:

                                text
user_id   ts
9235      2015-08-08 00:00:00   ac
          2015-08-08 03:00:00   d
2353      2015-08-08 00:00:00   b
          2015-08-08 06:00:00   e

我尝试了以下操作:

df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()

并出现以下错误:

TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex

我尝试在管道中传递set_index('ts'),但它似乎不是Series的属性.

I tried passing set_index('ts') in the pipe but it doesn't seem to be an attribute of Series.

关于如何实现这一目标的任何想法?

Any ideas on how to achieve this?

TL; DR

如果这使问题更容易解决,那么我也可以更改我创建的Castra DB的格式.我目前的实现方式主要来自这篇很棒的帖子

If it makes the problem easier, I'm also able to change the format of the Castra DB I created too. The implementation I have currently was largely taken from this great post.

我在to_df()函数中设置索引如下:

I set the index (in the to_df() function) as follows:

df.set_index('ts',drop=False,inplace=True)

并拥有:

  with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
     batches = partition_all(batch_size, f)
     df, frames = peek(map(self.to_df, batches))
     castra = Castra(S.CASTRA, template=df, categories=categories)
     castra.extend_sequence(frames, freq='3h')

以下是产生的dtypes:

Here are the resulting dtypes:

ts                datetime64[ns]
text                      object
user_id                  float64

推荐答案

如果我们可以假定每个user-id组都可以容纳在内存中,那么我建议使用dask.dataframe进行外部groupby,然后使用pandas进行每个组中的操作,如下所示.

If we can assume that each user-id group can fit in memory then I recommend using dask.dataframe to do the outer-groupby but then using pandas to do the operations within each group, something like the following.

def per_group(blk):
    return blk.groupby('ts').text.resample('3H', how='sum')

df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()

这将两个难题分解成两个不同的项目

This decouples two hard things into the two different projects

  1. 由dask.dataframe处理将所有用户ID组合到正确的组中
  2. 在每个组中执行复杂的日期时间重采样是由熊猫明确处理的.

理想情况下,dask.dataframe会自动为您编写每组功能.目前,dask.dataframe无法智能处理多索引,也无法在多列groupby之上重新采样,因此自动解决方案尚不可用.尽管如此,仍然有可能在使用dask.dataframe相应地准备分组时退回给熊猫进行逐块计算.

Ideally dask.dataframe would write the per-group function for you automatically. At the moment dask.dataframe does not intelligently handle multi-indexes, or resampling on top of multi-column groupbys, so the automatic solution isn't yet available. Still, it's quite possible to fall back to pandas for the per-block computation while still using dask.dataframe to prepare the groups accordingly.

这篇关于Dask DataFrame:对具有多行的groupby对象进行重采样的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆