dask 数据帧 - 时间序列分区 [英] dask dataframes -time series partitions

查看:65
本文介绍了dask 数据帧 - 时间序列分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个时间序列 Pandas 数据框,我想按月和年进行分区.我的想法是获取可用作索引的日期时间列表,但中断不会发生在本月第一天的 0:00 开始.

I have a timeseries pandas dataframe that I want to partition by month and year. My thought was to get a list of datetimes that would serve as the index but the break doesnt happen at the start 0:00 at the first of the month..

monthly_partitons=np.unique(df.index.values.astype('datetime64[M]')).tolist()
da=dd.from_pandas(df, npartitions=1)

如何设置索引从每个月开始?我尝试了 npartitions=len(monthly_partitions) 但我意识到这是错误的,因为它可能不会在开始时间的日期进行分区.应该如何确保它在该月的第一个日期分区?

how do I set the index to start at each month? I tried npartitions=len(monthly_partitions) but I realize that is wrong as the it may not partition on the date at start time. how should one ensure it partiitons on the first date of the month?

更新:

使用 da=da.repartition(freq='1M') 将数据从 10 分钟数据重新采样到 1 分钟数据,见下文

using da=da.repartition(freq='1M') resampled the data from 10 minutes data to 1 minute data see below

Dask DataFrame Structure:
Open    High    Low Close   Vol OI  VI  
npartitions=5037050                             
2008-05-04 18:00:00 float64 float64 float64 float64 int64   int64   float64 int32
2008-05-04 18:01:00 ... ... ... ... ... ... ... ...
... ... ... ... ... ... ... ... ...
2017-12-01 16:49:00 ... ... ... ... ... ... ... ...
2017-12-01 16:50:00 ... ... ... ... ... ... ... ...
Dask Name: repartition-merge, 10074101 tasks

更新 2:

这是重现问题的代码

import pandas as pd
import datetime as dt
import dask as dsk
import numpy as np
import dask.dataframe as dd

ts=pd.date_range("2015-01-01 00:00", " 2015-05-01 23:50", freq="10min")
df = pd.DataFrame(np.random.randint(0,100,size=(len(ts),4)), columns=list('ABCD'), index=ts)
ddf=dd.from_pandas(df,npartitions=1)
ddf=ddf.repartition(freq='1M')
ddf

推荐答案

假设您的数据框已经按时间编入索引,您应该能够使用 重新分区方法来实现这一点.

Assuming your dataframe is already indexed by time you should be able to use the repartition method to accomplish this.

df = df.repartition(freq='1M')

在上面的 MCVE 之后编辑

(感谢添加最小且完整的示例!)

Edit after MCVE above

(thanks for adding the minimal and complete example!)

有趣的是,这看起来像是一个错误,无论是在 Pandas 还是 dask 中.我假设 '1M' 意味着一个月,(正如它在 pd.date_range 中所做的那样)

Interesting, this looks like a bug, either in pandas or dask. I assumed that '1M' would mean one month, (as it does in pd.date_range)

In [12]: pd.date_range('2017-01-01', '2017-12-15', freq='1M')
Out[12]: 
DatetimeIndex(['2017-01-31', '2017-02-28', '2017-03-31', '2017-04-30',
               '2017-05-31', '2017-06-30', '2017-07-31', '2017-08-31',
               '2017-09-30', '2017-10-31', '2017-11-30'],
              dtype='datetime64[ns]', freq='M')

然而,当传递给 pd.Timedelta 时,它意味着一分钟

And yet, when passed to pd.Timedelta, it means one minute

In [13]: pd.Timedelta('1M')
Out[13]: Timedelta('0 days 00:01:00')

In [14]: pd.Timedelta('1m')
Out[14]: Timedelta('0 days 00:01:00')

所以它挂了,因为它试图创建比你预期的多 43200 个分区:)

So it's hanging because it's trying to make around 43200 more partitions than you intended :)

我们应该为此提交错误报告(您有兴趣这样做吗?).一个短期的解决方法是自己明确指定部门.

We should file a bug report for this (do you have any interest in doing this?). A short term workaround would be to specify divisions yourself explicitly.

In [17]: divisions = pd.date_range('2015-01-01', '2015-05-01', freq='1M').tolist
    ...: ()
    ...: divisions[0] = ddf.divisions[0]
    ...: divisions[-1] = ddf.divisions[-1]
    ...: ddf.repartition(divisions=divisions)
    ...: 
Out[17]: 
Dask DataFrame Structure:
                         A      B      C      D
npartitions=3                                  
2015-01-01 00:00:00  int64  int64  int64  int64
2015-02-28 00:00:00    ...    ...    ...    ...
2015-03-31 00:00:00    ...    ...    ...    ...
2015-05-01 23:50:00    ...    ...    ...    ...
Dask Name: repartition-merge, 7 tasks

这篇关于dask 数据帧 - 时间序列分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆