Python PANDAS:从pandas/numpy转换为dask dataframe/array [英] Python PANDAS: Converting from pandas/numpy to dask dataframe/array
问题描述
我正在尝试使用出色的dask库将程序转换为可并行化/多线程的程序.这是我正在转换的程序:
I am working to try to convert a program to be parallelizable/multithreaded with the excellent dask library. Here is the program I am working on converting:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
from io import StringIO
test_data = '''id,transaction_dt,units,measures
1,2018-01-01,4,30.5
1,2018-01-03,4,26.3
2,2018-01-01,3,12.7
2,2018-01-03,3,8.8'''
df_test = pd.read_csv(StringIO(test_data), sep=',')
df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
df_test = df_test.loc[np.repeat(df_test.index, df_test['units'])]
df_test['transaction_dt'] += pd.to_timedelta(df_test.groupby(level=0).cumcount(), unit='d')
df_test = df_test.reset_index(drop=True)
预期结果:
id,transaction_dt,measures
1,2018-01-01,30.5
1,2018-01-02,30.5
1,2018-01-03,30.5
1,2018-01-04,30.5
1,2018-01-03,26.3
1,2018-01-04,26.3
1,2018-01-05,26.3
1,2018-01-06,26.3
2,2018-01-01,12.7
2,2018-01-02,12.7
2,2018-01-03,12.7
2,2018-01-03,8.8
2,2018-01-04,8.8
2,2018-01-05,8.8
在我看来,这可能是尝试并行化的一个不错的选择,因为独立的dask分区不需要了解彼此就可以完成所需的操作.这是我认为它可能如何工作的幼稚表示形式:
It occurred to me that this might be a good candidate to try to parallelize because the separate dask partitions should not need to know anything about each other to accomplish the required operations. Here is a naive representation of how I thought it might work:
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] += dd_test.to_timedelta(dd.groupby(level=0).cumcount(), unit='d')
dd_test = dd_test.reset_index(drop=True)
到目前为止,我一直在尝试解决以下错误或惯用差异:
So far I have been trying to work through the following errors or idiomatic differences:
- "NotImplementedError:仅支持整数重复." 我尝试将索引转换为int列/数组以进行尝试,但仍然遇到问题.
- "NotImplementedError: Only integer valued repeats supported." I have tried to convert the index into a int column/array to try as well but still run into the issue.
2. dask不支持变异运算符:"+ ="
3.没有dask .to_timedelta()参数
4.没有dask .cumcount()(但是我认为.cumsum()是可互换的?!)
如果那里有任何专家,可以让我知道是否有基本障碍阻止我尝试此操作或任何实施技巧,那么这将是很大的帮助!
If there are any dask experts out there who might be able let me know if there are fundamental impediments to preclude me from trying this or any tips on implementation, that would be a great help!
自发布问题以来,我认为我已经在此方面取得了一些进展:
I think I have made a bit of progress on this since posting the question:
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1
dd_test = dd_test.loc[da.repeat(dd_test.index, dd_test['units'])]
dd_test['transaction_dt'] = dd_test['transaction_dt'] + (dd.test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)
但是,我仍然停留在dask数组重复错误上.仍然欢迎任何提示.
However, I am still stuck on the dask array repeats error. Any tips still welcome.
推荐答案
不确定这是否正是您要寻找的东西,但我使用np.repeat替换了da.repeat,并显式转换了dd_test.index
和dd_test['units']
到numpy数组,最后将dd_test['transaction_dt'].astype('M8[us]')
添加到您的timedelta计算中.
Not sure if this is exactly what you are looking for, but I replaced the da.repeat with using np.repeat, along with explicity casting dd_test.index
and dd_test['units']
to numpy arrays, and finally adding dd_test['transaction_dt'].astype('M8[us]')
to your timedelta calculation.
df_test = pd.read_csv(StringIO(test_data), sep=',')
dd_test = dd.from_pandas(df_test, npartitions=3)
dd_test['helper'] = 1
dd_test = dd_test.loc[np.repeat(np.array(dd_test.index),
np.array(dd_test['units']))]
dd_test['transaction_dt'] = dd_test['transaction_dt'].astype('M8[us]') + (dd_test.groupby('id')['helper'].cumsum()).astype('timedelta64[D]')
dd_test = dd_test.reset_index(drop=True)
df_expected = dd_test.compute()
这篇关于Python PANDAS:从pandas/numpy转换为dask dataframe/array的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!