Dask DataFrame:对具有多行的groupby对象进行重采样 [英] Dask DataFrame: Resample over groupby object with multiple rows
问题描述
我从Castra创建了以下dask数据框:
I have the following dask dataframe created from Castra:
import dask.dataframe as dd
df = dd.from_castra('data.castra', columns=['user_id','ts','text'])
屈服:
user_id / ts / text
ts
2015-08-08 01:10:00 9235 2015-08-08 01:10:00 a
2015-08-08 02:20:00 2353 2015-08-08 02:20:00 b
2015-08-08 02:20:00 9235 2015-08-08 02:20:00 c
2015-08-08 04:10:00 9235 2015-08-08 04:10:00 d
2015-08-08 08:10:00 2353 2015-08-08 08:10:00 e
我想做的是:
- 按
user_id
和ts
分组
- 在3个小时内对其重新采样
- 在重采样步骤中,所有合并的行都应将文本连接起来
- Group by
user_id
andts
- Resample it over a 3-hour period
- In the resampling step, any merged rows should concatenate the texts
示例输出:
text
user_id ts
9235 2015-08-08 00:00:00 ac
2015-08-08 03:00:00 d
2353 2015-08-08 00:00:00 b
2015-08-08 06:00:00 e
我尝试了以下操作:
df.groupby(['user_id','ts'])['text'].sum().resample('3H', how='sum').compute()
并出现以下错误:
TypeError: Only valid with DatetimeIndex, TimedeltaIndex or PeriodIndex
我尝试在管道中传递set_index('ts')
,但它似乎不是Series
的属性.
I tried passing set_index('ts')
in the pipe but it doesn't seem to be an attribute of Series
.
关于如何实现这一目标的任何想法?
Any ideas on how to achieve this?
TL; DR
如果这使问题更容易解决,那么我也可以更改我创建的Castra DB的格式.我目前的实现方式主要来自这篇很棒的帖子
If it makes the problem easier, I'm also able to change the format of the Castra DB I created too. The implementation I have currently was largely taken from this great post.
我在to_df()
函数中设置索引如下:
I set the index (in the to_df()
function) as follows:
df.set_index('ts',drop=False,inplace=True)
并拥有:
with BZ2File(os.path.join(S.DATA_DIR,filename)) as f:
batches = partition_all(batch_size, f)
df, frames = peek(map(self.to_df, batches))
castra = Castra(S.CASTRA, template=df, categories=categories)
castra.extend_sequence(frames, freq='3h')
以下是产生的dtypes:
Here are the resulting dtypes:
ts datetime64[ns]
text object
user_id float64
推荐答案
如果我们可以假定每个user-id
组都可以容纳在内存中,那么我建议使用dask.dataframe进行外部groupby,然后使用pandas进行每个组中的操作,如下所示.
If we can assume that each user-id
group can fit in memory then I recommend using dask.dataframe to do the outer-groupby but then using pandas to do the operations within each group, something like the following.
def per_group(blk):
return blk.groupby('ts').text.resample('3H', how='sum')
df.groupby('user_id').apply(per_group, columns=['ts', 'text']).compute()
这将两个难题分解成两个不同的项目
This decouples two hard things into the two different projects
- 由dask.dataframe处理将所有用户ID组合到正确的组中
- 在每个组中执行复杂的日期时间重采样是由熊猫明确处理的.
理想情况下,dask.dataframe会自动为您编写每组功能.目前,dask.dataframe无法智能处理多索引,也无法在多列groupby之上重新采样,因此自动解决方案尚不可用.尽管如此,仍然有可能在使用dask.dataframe相应地准备分组时退回给熊猫进行逐块计算.
Ideally dask.dataframe would write the per-group function for you automatically. At the moment dask.dataframe does not intelligently handle multi-indexes, or resampling on top of multi-column groupbys, so the automatic solution isn't yet available. Still, it's quite possible to fall back to pandas for the per-block computation while still using dask.dataframe to prepare the groups accordingly.
这篇关于Dask DataFrame:对具有多行的groupby对象进行重采样的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!