利用 pandas 提高大型HDFStore表的查询性能 [英] Improve Query Performance From a Large HDFStore Table with Pandas
问题描述
我有一个大的(约1.6亿行)数据框存储到磁盘中, def fillStore(store,tablename):
files = glob.glob('201312 * .csv')
names = [ts,c_id,f_id,resp_id,resp_len ,s_id]
用于文件中的f:
df = pd.read_csv(f,parse_dates = True,index_col = 0,names = names)
store.append(tablename,df ,format ='table',data_columns = ['c_id','f_id'])
有一个时间索引,除了时间(通过索引)外,我还将使用 c_id
和 f_id
进行查询。
我有另一个包含约18000个事件的数据框。每个事件都由一些(少至数百,多达数十万)个人记录组成。我需要为每个事件收集一些简单的统计数据并存储它们以便收集一些汇总统计数据。目前我这样做:
def makeQueryString(c,f,start,stop):
returnc_id == {}& f_id == {}& index> = Timestamp('{}')& index< Timestamp('{}')格式(c,f,str(pd.to_datetime开始)),str(pd.to_datetime(stop)))
$ b $ def getIncidents(inc_times,store,tablename):
incidents = pd.DataFrame(columns = ['c_id',' f_id','resp_id','resp_len','s_id','incident_id'])
for ind,inc_times.iterrows()中的行:
incidents = incidents.append(store.select(tablename ,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop)))。fillna(ind)
返回事件
这一切正常除了每个 store.select()
语句花费大约5秒钟这一事实,这意味着处理完整月份的数据需要24-30小时的处理时间。同时,我需要的实际统计数据相对简单:
def getIncidentStats(df):
incLen =(df。 index [-1] -df.index [0])。total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = {rqsts / incLen
return pd.Series({'c_id':df.c_id [0],
'f_id':df.fqdn_id [0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std( )})
incs = getIncidents(i_times,store,tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats )
我的问题是:如何提高此部分的性能或效率工作流程? 有没有办法处理数据商店,这有什么好处吗? 其他笔记/问题:我比较了存储HDFStore的性能无论是SSD还是普通硬盘,并没有注意到SSD的改进。这是预期吗? 我也玩弄了创建查询字符串的大型联合并一次询问它们的想法。如果总查询字符串太大(〜5-10查询),这会导致内存错误。 编辑1 使用表版本3.1.0和熊猫版本0.13.1 编辑2 以下是一些更多信息: 以下是主表格和inc_times的示例: 关于c_id和f_id,我想从完整商店中选择的一组ID与商店中的ID总数相比相对较少。换句话说,inc_times中有一些流行的ID,我会反复查询,而完全忽略整个表中存在的一些ID。我估计,我关心的ID大概占ID总数的10%,但这些ID是最受欢迎的ID,因此他们的记录在整套ID中占据主导地位。 我有16GB内存。完整的商店是7.4G,完整的数据集(作为csv文件)只有8.7 GB。起初我相信我能够将所有内容加载到内存中,并且至少对其进行一些有限的操作,但是在加载整个内容时出现内存错误。因此,将其分配到每日文件(完整文件由一个月的数据组成)。 以下是一些建议,类似的问题是 here 使用压缩:参见这里。你应该尝试一下(这可以使它更快/更慢取决于你正在查询的内容),YMMV。 在块中使用分层查询。我的意思是这个。既然你有少量的 这里的关键是处理, 通过这种方式,只需几个循环就可以在内存中获得很好的数据子集。这些查询 查询时间大致是不变的对于一个给定的子集(除非数据被排序,使它被完全索引)。 因此,查询扫描数据块(这是索引点至)。如果您在很多块中有很多匹配,那么查询就会变慢。 这里是一个例子 这个特殊的例子是5GB未压缩和2.9GB压缩。在这种情况下,使用未压缩的数据实际上要快得多(例如,第一次循环的时间为3.5s),这是100MM的行数。
我会使用store.select_as_index获益吗?如果我收到一个索引,我仍然需要访问这些数据才能获得统计信息?
ptdump -av store.h5
/(RootGroup)''
/._v_attrs(AttributeSet),4个属性:
[CLASS:='GROUP',
PYTABLES_FORMAT_VERSION:='2.0',
TITLE:='',
VERSION:='1.0']
/ all_recs(Group)' '
/all_recs._v_attrs(AttributeSet),14个属性:
[CLASS:='GROUP',
TITLE:='',
VERSION:='1.0',
data_columns:= ['c_id','f_id'],
编码:=无,
index_cols:= [(0,'index')],
info:= { 1:{'type':'Index','names':[None]},'index':{'index_name':'ts'}},
levels:= 1,
nan_rep: ='nan',
n on_index_axes:= [(1,['c_id','f_id','resp_id','resp_len','dns_server_id'])],
pandas_type:='frame_table',
pandas_version:=' 0.10.1',
table_type:='appendable_frame',
values_cols:= ['values_block_0','c_id','f_id']]
/ all_recs / table(表(161738653, ))''
description:= {
index:Int64Col(shape =(),dflt = 0,pos = 0),
values_block_0:Int64Col(shape =(3 ,),dflt = 0,pos = 1),
c_id:Int64Col(shape =(),dflt = 0,pos = 2),
f_id ,$ d
$ b $索引(6,medium,shuffle,zlib(1))。is_csi = False,
f_id:Index(6,medium,shuffle,zlib(1))。is_csi = False,
c_id:Index(6,medium,shuffle,zlib(1))。is_csi = False}
/all_recs/table._v_attrs(AttributeSet),19属性:
[CLASS:= 'TABLE',
FIELD_0_FILL:= 0,
FIELD_0_NAME:='index',
FIELD_1_FILL:= 0,
FIELD_1_NAME:='values_block_0',
FIELD_2_FILL:= 0,
FIELD_2_NAME :='c_id',
FIELD_3_FILL:= 0,
FIELD_3_NAME:='f_id',
NROWS:= 161738653,
TITLE:='',
VERSION :='2.6',
client_id_dtype:='int64',
client_id_kind:= ['c_id'],
fqdn_id_dtype:='int64',
fqdn_id_kind:= [ f_id'],
index_kind:='datetime64',
values_block_0_dtype:='int64',
values_block_0_kind:= ['s_id','resp_len','resp_id']]
< pre $ 在[12]中:df.head()
pre>
出[12]:
c_id f_id resp_id resp_len \
ts
2013 -12-0 4 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449
s_id
$
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727
[5行x 5列]
In [ 13]:inc_times.head()
Out [13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16 032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000
[5行×4栏]
ptrepack - chunkshape = auto --propindexes --complevel = 9 --complib = blosc in.h5 out.h5
c_id
和 f_id
那么你关心,那么就像这样构造一个单一的查询。这就像使用 isin
。
f_ids = list_of_f_ids that I关心
c_ids = list_of_c_ids,我关心的是
def create_batches(l,maxn = 32):
创建一个批量列表,最大值为maxn
batches = []
while(True):
if len(l)<= maxn:
if len(l)> 0:
batches.append(l)
break
batches.append(l [0:maxn])
l = 1 [maxn:]
返回批次
results = []
用于create_batches(f_id_list)中的f_id_batch:
用于create_batches(c_id_list)中的c_id_batch:
q =f_id = {f_id}& c_id = {c_id}。格式(
f_id = f_id_batch,
c_id = c_id_batch)
#你可以包含最大/在这里也是次数(它们会是所有包含批次的最大值/最小值
#时间,可能很容易计算出
result = store.select('df',其中= q)
#子过程结果
def f(x):
#你需要在这里过滤出最小/最大时间戳收集
#有些依赖f_id / c_id组
####处理数据并返回一些东西
#你可以做som例如:``return x.describe()``获取简单的统计信息
results.append(result.groupby(['f_id','c_id')。apply(f))
results = pd.concat(results)
isin
对于您查询的任何变量,没有更多32位成员
。这是一个内部numpy / pytables限制。
如果超出这个范围,查询就会起作用,但它会放弃这个变量并对所有数据做一个reindex
(这不是你想要的)。
我认为与大多数查询大致相同,但您的方式会少一些。
In [5]:N = 100000000
在[6]中:df = DataFrame(np.random.randn(N,3),columns = ['A', 'B','C'])
在[7]中:df ['c_id'] = np.random.randint(0,10,size = N)
[8]:df ['f_id'] = np.random.randint(0,10,size = N)
在[9]中:df.index = date_range('20130101',在[10]中:df.to_hdf('test2.h5','df',mode ='w',data_columns = ['c_id', 'f_id'])
在[11]:df.head()
Out [11]:
ABC c_id f_id
2013-01-01 00: 00:00 0.037287 1.153534 0.639669 8 7
2013-01-01 00:00:01 1.741046 0.459821 0.194282 8 3
2013-01-01 00:00:02 -2.273919 -0.141789 0.770567 1 1
2013-01-01 00:00:03 0.320879 -0.108426 -1.310302 8 6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362 5 5
2013-01-01 00:00:05 1.608211 0.069196 0.025021 3 6
2013-01-01 00:00:06 -0.561690 0.613579 1.071438 8 2
2013-01-01 00:00:07 1.795043 -0.661966 1.210714 0 0
2013-01-01 00:00:08 0.176347 -0.461176 1.624514 3 6
2013-01-01 00:00:09 -1.084537 1.941610 -1.423559 9 1
2013-01-01 00:00:10 -0.101036 0.925010 -0.809951 0 9
2013-01-01 00:00:11 -1.185520 0.968519 2.871983 7 5
2013-01 -01 00:00:12 -1.089267 -0.333969 -0.665014 3 6
2013-01-01 00:00:13 0.544427 0.130439 0.423749 5 7
2013-01-01 00:00:14 0.112216 0.404801 -0.061730 5 4
2013-01-01 00:00:15 -1.349838 -0.639435 0.993495 0 9
在[2]中:%timeit pd.read_hdf('test2 .h5','df',其中=f_id = [1]& c_id = [2])
1个循环,最好是3:每循环13.9 s
在[3]中:%timeit pd.read_hdf('test2.h5','df' ,其中=f_id = [1,2]& c_id = [1,2])
1个循环,最好是3:每个循环21.2秒
在[4]中: %timeit pd.read_hdf('test.2h5','df',where =f_id = [1,2,3]& c_id = [1,2,3])
1个循环,最好的3:每循环42.8秒
然而,您的加速应该更多,因为您赢了我们不会选择个别时间戳,而是稍后再做。
整个方法考虑到您有足够的内存将结果保存在批次中(例如,您正在批次查询中选择相对较小的一部分)。
I have a large (~160 million rows) dataframe that I've stored to disk with something like this:
def fillStore(store, tablename):
files = glob.glob('201312*.csv')
names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
for f in files:
df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
store.append(tablename, df, format='table', data_columns=['c_id','f_id'])
The table has a time index and I will query using c_id
and f_id
in addition to times (via the index).
I have another dataframe containing ~18000 "incidents." Each incident consists of some (as few as hundreds, as many as hundreds of thousands) individual records. I need to collect some simple statistics for each incident and store them in order to collect some aggregate statistics. Currently I do this like so:
def makeQueryString(c, f, start, stop):
return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))
def getIncidents(inc_times, store, tablename):
incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
for ind, row in inc_times.iterrows():
incidents = incidents.append(store.select(tablename,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop))).fillna(ind)
return incidents
This all works fine except for the fact that each store.select()
statement takes roughly 5 seconds which means that processing the full month's worth of data requires somewhere between 24-30 hours of processing. Meanwhile, the actual statistics I need are relatively simple:
def getIncidentStats(df):
incLen = (df.index[-1]-df.index[0]).total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = rqsts/incLen
return pd.Series({'c_id':df.c_id[0],
'f_id':df.fqdn_id[0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std()})
incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)
My question is: how can I improve the performance or efficiency of any part of this work flow? (Please note that I actually batch most of the jobs to get and store incidents one day at a time simply because I want to limit the risk of losing already processed data in the even of a crash. I left this code out here for simplicity and because I actually need to process the whole month's data.)
Is there a way to process the data as I receive it from the store and is there any benefit to this? Would I benefit from using store.select_as_index? If I receive an index I'd still need to access the data to get the statistics correct?
Other notes/questions: I have compared the performance of storing my HDFStore on both a SSD and normal hard drive and didn't notice any improvement for the SSD. Is this expected?
I also toyed with the idea of creating a large conjunction of query strings and asking for them all at once. This causes memory errors when the total query string is too large (~5-10 queries).
Edit 1 If it matters, I am using tables version 3.1.0 and pandas version 0.13.1
Edit 2 Here is some more information:
ptdump -av store.h5
/ (RootGroup) ''
/._v_attrs (AttributeSet), 4 attributes:
[CLASS := 'GROUP',
PYTABLES_FORMAT_VERSION := '2.0',
TITLE := '',
VERSION := '1.0']
/all_recs (Group) ''
/all_recs._v_attrs (AttributeSet), 14 attributes:
[CLASS := 'GROUP',
TITLE := '',
VERSION := '1.0',
data_columns := ['c_id', 'f_id'],
encoding := None,
index_cols := [(0, 'index')],
info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
levels := 1,
nan_rep := 'nan',
non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
pandas_type := 'frame_table',
pandas_version := '0.10.1',
table_type := 'appendable_frame',
values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
description := {
"index": Int64Col(shape=(), dflt=0, pos=0),
"values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
"c_id": Int64Col(shape=(), dflt=0, pos=2),
"f_id": Int64Col(shape=(), dflt=0, pos=3)}
byteorder := 'little'
chunkshape := (5461,)
autoindex := True
colindexes := {
"index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
"c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
/all_recs/table._v_attrs (AttributeSet), 19 attributes:
[CLASS := 'TABLE',
FIELD_0_FILL := 0,
FIELD_0_NAME := 'index',
FIELD_1_FILL := 0,
FIELD_1_NAME := 'values_block_0',
FIELD_2_FILL := 0,
FIELD_2_NAME := 'c_id',
FIELD_3_FILL := 0,
FIELD_3_NAME := 'f_id',
NROWS := 161738653,
TITLE := '',
VERSION := '2.6',
client_id_dtype := 'int64',
client_id_kind := ['c_id'],
fqdn_id_dtype := 'int64',
fqdn_id_kind := ['f_id'],
index_kind := 'datetime64',
values_block_0_dtype := 'int64',
values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]
Here are samples of both the main table and inc_times:
In [12]: df.head()
Out[12]:
c_id f_id resp_id resp_len \
ts
2013-12-04 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449
s_id
ts
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727
[5 rows x 5 columns]
In [13]: inc_times.head()
Out[13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000
[5 rows x 4 columns]
Regarding c_id and f_id, the set of IDs I want to select from the full store is relatively few compared to the total number of IDs in the store. In other words, there are some popular IDs in inc_times that I will repeatedly query while completely ignoring some of the IDs that exist in the full table. I'd estimate that the Ids I care about are roughly 10% of the total IDs, but that these are the most popular IDs so their records dominate the full set.
I have 16GB RAM. The full store is 7.4G and the full dataset (as a csv file) is only 8.7 GB. Initially I believed I would be able to load the whole thing in memory and at least do some limited operations on it, but I get memory errors on loading the whole thing. Hence, batching it into daily files (the full file consists of data for one month).
Here's some recommendations and a similar question is here
Use compression: see here. You should try this (this could make it faster / slower depending on exactly what you are querying), YMMV.
ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5
Use a hierarchical query in chunks. What I mean is this. Since you have a relatively small number of c_id
and f_id
that you care about, structure a single query something like this. This is kind of like using isin
.
f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about
def create_batches(l, maxn=32):
""" create a list of batches, maxed at maxn """
batches = []
while(True):
if len(l) <= maxn:
if len(l) > 0:
batches.append(l)
break
batches.append(l[0:maxn])
l = l[maxn:]
return batches
results = []
for f_id_batch in create_batches(f_id_list):
for c_id_batch in create_batches(c_id_list):
q = "f_id={f_id} & c_id={c_id}".format(
f_id=f_id_batch,
c_id=c_id_batch)
# you can include the max/min times in here as well (they would be max/min
# time for ALL the included batches though, maybe easy for you to compute
result = store.select('df',where=q)
# sub process this result
def f(x):
# you will need to filter out the min/max timestamps here (which I gather
# are somewhat dependent on f_id/c_id group
#### process the data and return something
# you could do something like: ``return x.describe()`` for simple stats
results.append(result.groupby(['f_id','c_id').apply(f))
results = pd.concat(results)
The key here is to process so that the isin
DOES not have more that 32 members
for any variable that you are querying on. This is an internal numpy/pytables limitation.
If you exceed this, the query will work, but it will drop that variable and do a reindex
on ALL the data (which is NOT what you want here).
This way you will have a nice subset of data in memory over just a few loops. These queries I think would take about the same time as most of your queries or so, but you will have way fewer.
The query time is roughly constant for a given subset (unless the data is ordered such that it it is completely indexed).
So the query scans 'blocks' of data (which is what the indexes point to). If you have lots of hits across many blocks then the query is slower.
Here's an example
In [5]: N = 100000000
In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])
In [7]: df['c_id'] = np.random.randint(0,10,size=N)
In [8]: df['f_id'] = np.random.randint(0,10,size=N)
In [9]: df.index = date_range('20130101',periods=N,freq='s')
In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])
In [11]: df.head()
Out[11]:
A B C c_id f_id
2013-01-01 00:00:00 0.037287 1.153534 0.639669 8 7
2013-01-01 00:00:01 1.741046 0.459821 0.194282 8 3
2013-01-01 00:00:02 -2.273919 -0.141789 0.770567 1 1
2013-01-01 00:00:03 0.320879 -0.108426 -1.310302 8 6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362 5 5
2013-01-01 00:00:05 1.608211 0.069196 0.025021 3 6
2013-01-01 00:00:06 -0.561690 0.613579 1.071438 8 2
2013-01-01 00:00:07 1.795043 -0.661966 1.210714 0 0
2013-01-01 00:00:08 0.176347 -0.461176 1.624514 3 6
2013-01-01 00:00:09 -1.084537 1.941610 -1.423559 9 1
2013-01-01 00:00:10 -0.101036 0.925010 -0.809951 0 9
2013-01-01 00:00:11 -1.185520 0.968519 2.871983 7 5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014 3 6
2013-01-01 00:00:13 0.544427 0.130439 0.423749 5 7
2013-01-01 00:00:14 0.112216 0.404801 -0.061730 5 4
2013-01-01 00:00:15 -1.349838 -0.639435 0.993495 0 9
In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop
In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop
In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop
This particular example is 5GB uncompressed and 2.9GB compressed. These results are on the compressed data. In THIS case it is actually quite a bit faster to use the uncompressed (e.g. the first loop taked 3.5s). This is 100MM rows.
So using the last example (4) you are getting 9x the data of the first in a little over 3x the query time.
However your speedup should be MUCH more, because you won't be selecting on individual timestamps, rather doing that later.
This whole approach takes into account that you have enough main memory to hold your results in the batch sizes (e.g. you are selecting a relatively small part of the set in the batch queries).
这篇关于利用 pandas 提高大型HDFStore表的查询性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!