利用 pandas 提高大型HDFStore表的查询性能 [英] Improve Query Performance From a Large HDFStore Table with Pandas

查看:149
本文介绍了利用 pandas 提高大型HDFStore表的查询性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述



  

我有一个大的(约1.6亿行)数据框存储到磁盘中, def fillStore(store,tablename):
files = glob.glob('201312 * .csv')
names = [ts,c_id,f_id,resp_id,resp_len ,s_id]
用于文件中的f:
df = pd.read_csv(f,parse_dates = True,index_col = 0,names = names)
store.append(tablename,df ,format ='table',data_columns = ['c_id','f_id'])

有一个时间索引,除了时间(通过索引)外,我还将使用 c_id f_id 进行查询。

我有另一个包含约18000个事件的数据框。每个事件都由一些(少至数百,多达数十万)个人记录组成。我需要为每个事件收集一些简单的统计数据并存储它们以便收集一些汇总统计数据。目前我这样做:

  def makeQueryString(c,f,start,stop):
returnc_id == {}& f_id == {}& index> = Timestamp('{}')& index< Timestamp('{}')格式(c,f,str(pd.to_datetime开始)),str(pd.to_datetime(stop)))
$ b $ def getIncidents(inc_times,store,tablename):
incidents = pd.DataFrame(columns = ['c_id',' f_id','resp_id','resp_len','s_id','incident_id'])
for ind,inc_times.iterrows()中的行:
incidents = incidents.append(store.select(tablename ,
makeQueryString(row.c_id,
row.f_id,
row.start,
row.stop)))。fillna(ind)
返回事件

这一切正常除了每个 store.select()语句花费大约5秒钟这一事实,这意味着处理完整月份的数据需要24-30小时的处理时间。同时,我需要的实际统计数据相对简单:

  def getIncidentStats(df):
incLen =(df。 index [-1] -df.index [0])。total_seconds()
if incLen == 0:
incLen = .1
rqsts = len(df)
rqstRate_s = {rqsts / incLen
return pd.Series({'c_id':df.c_id [0],
'f_id':df.fqdn_id [0],
'Length_sec':incLen,
'num_rqsts':rqsts,
'rqst_rate':rqstRate_s,
'avg_resp_size':df.response_len.mean(),
'std_resp_size':df.response_len.std( )})


incs = getIncidents(i_times,store,tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats )

我的问题是:如何提高此部分的性能或效率工作流程?

有没有办法处理数据商店,这有什么好处吗?
我会使用store.select_as_index获益吗?如果我收到一个索引,我仍然需要访问这些数据才能获得统计信息?



其他笔记/问题:我比较了存储HDFStore的性能无论是SSD还是普通硬盘,并没有注意到SSD的改进。这是预期吗?



我也玩弄了创建查询字符串的大型联合并一次询问它们的想法。如果总查询字符串太大(〜5-10查询),这会导致内存错误。



编辑1 使用表版本3.1.0和熊猫版本0.13.1

编辑2 以下是一些更多信息:

  ptdump -av store.h5 
/(RootGroup)''
/._v_attrs(AttributeSet),4个属性:
[CLASS:='GROUP',
PYTABLES_FORMAT_VERSION:='2.0',
TITLE:='',
VERSION:='1.0']
/ all_recs(Group)' '
/all_recs._v_attrs(AttributeSet),14个属性:
[CLASS:='GROUP',
TITLE:='',
VERSION:='1.0',
data_columns:= ['c_id','f_id'],
编码:=无,
index_cols:= [(0,'index')],
info:= { 1:{'type':'Index','names':[None]},'index':{'index_name':'ts'}},
levels:= 1,
nan_rep: ='nan',
n on_index_axes:= [(1,['c_id','f_id','resp_id','resp_len','dns_server_id'])],
pandas_type:='frame_table',
pandas_version:=' 0.10.1',
table_type:='appendable_frame',
values_cols:= ['values_block_0','c_id','f_id']]
/ all_recs / table(表(161738653, ))''
description:= {
index:Int64Col(shape =(),dflt = 0,pos = 0),
values_block_0:Int64Col(shape =(3 ,),dflt = 0,pos = 1),
c_id:Int64Col(shape =(),dflt = 0,pos = 2),
f_id ,$ d














$ b $索引(6,medium,shuffle,zlib(1))。is_csi = False,
f_id:Index(6,medium,shuffle,zlib(1))。is_csi = False,
c_id:Index(6,medium,shuffle,zlib(1))。is_csi = False}
/all_recs/table._v_attrs(AttributeSet),19属性:
[CLASS:= 'TABLE',
FIELD_0_FILL:= 0,
FIELD_0_NAME:='index',
FIELD_1_FILL:= 0,
FIELD_1_NAME:='values_block_0',
FIELD_2_FILL:= 0,
FIELD_2_NAME :='c_id',
FIELD_3_FILL:= 0,
FIELD_3_NAME:='f_id',
NROWS:= 161738653,
TITLE:='',
VERSION :='2.6',
client_id_dtype:='int64',
client_id_kind:= ['c_id'],
fqdn_id_dtype:='int64',
fqdn_id_kind:= [ f_id'],
index_kind:='datetime64',
values_block_0_dtype:='int64',
values_block_0_kind:= ['s_id','resp_len','resp_id']]

以下是主表格和inc_times的示例:

< pre $ 在[12]中:df.head()
出[12]:
c_id f_id resp_id resp_len \
ts
2013 -12-0 4 08:00:00 637092486 5372764353 30 56767543
2013-12-04 08:00:01 637092486 5399580619 23 61605423
2013-12-04 08:00:04 5456242 5385485460 21 46742687
2013-12-04 08:00:04 5456242 5385485460 21 49909681
2013-12-04 08:00:04 624791800 5373236646 14 70461449

s_id
$
2013-12-04 08:00:00 1829
2013-12-04 08:00:01 1724
2013-12-04 08:00:04 1679
2013-12-04 08:00:04 1874
2013-12-04 08:00:04 1727

[5行x 5列]


In [ 13]:inc_times.head()
Out [13]:
c_id f_id start stop
0 7254 196211 1385880945000000000 1385880960000000000
1 9286 196211 1387259840000000000 1387259850000000000
2 16 032 196211 1387743730000000000 1387743735000000000
3 19793 196211 1386208175000000000 1386208200000000000
4 19793 196211 1386211800000000000 1386211810000000000

[5行×4栏]
pre>

关于c_id和f_id,我想从完整商店中选择的一组ID与商店中的ID总数相比相对较少。换句话说,inc_times中有一些流行的ID,我会反复查询,而完全忽略整个表中存在的一些ID。我估计,我关心的ID大概占ID总数的10%,但这些ID是最受欢迎的ID,因此他们的记录在整套ID中占据主导地位。



我有16GB内存。完整的商店是7.4G,完整的数据集(作为csv文件)只有8.7 GB。起初我相信我能够将所有内容加载到内存中,并且至少对其进行一些有限的操作,但是在加载整个内容时出现内存错误。因此,将其分配到每日文件(完整文件由一个月的数据组成)。

解决方案

以下是一些建议,类似的问题是 here



使用压缩:参见这里。你应该尝试一下(这可以使它更快/更慢取决于你正在查询的内容),YMMV。

  ptrepack  - chunkshape = auto --propindexes --complevel = 9 --complib = blosc in.h5 out.h5 

在块中使用分层查询。我的意思是这个。既然你有少量的 c_id f_id 那么你关心,那么就像这样构造一个单一的查询。这就像使用 isin

  f_ids = list_of_f_ids that I关心
c_ids = list_of_c_ids,我关心的是

def create_batches(l,maxn = 32):
创建一个批量列表,最大值为maxn
batches = []
while(True):
if len(l)<= maxn:
if len(l)> 0:
batches.append(l)
break
batches.append(l [0:maxn])
l = 1 [maxn:]
返回批次


results = []
用于create_batches(f_id_list)中的f_id_batch:

用于create_batches(c_id_list)中的c_id_batch:

q =f_id = {f_id}& c_id = {c_id}。格式(
f_id = f_id_batch,
c_id = c_id_batch)

#你可以包含最大/在这里也是次数(它们会是所有包含批次的最大值/最小值
#时间,可能很容易计算出

result = store.select('df',其中= q)

#子过程结果

def f(x):
#你需要在这里过滤出最小/最大时间戳收集
#有些依赖f_id / c_id组

####处理数据并返回一些东西
#你可以做som例如:``return x.describe()``获取简单的统计信息

results.append(result.groupby(['f_id','c_id')。apply(f))

results = pd.concat(results)

这里的关键是处理, isin 对于您查询的任何变量,没有更多32位成员
。这是一个内部numpy / pytables限制。
如果超出这个范围,查询就会起作用,但它会放弃这个变量并对所有数据做一个reindex
(这不是你想要的)。



通过这种方式,只需几个循环就可以在内存中获得很好的数据子集。这些查询
我认为与大多数查询大致相同,但您的方式会少一些。



查询时间大致是不变的对于一个给定的子集(除非数据被排序,使它被完全索引)。

因此,查询扫描数据块(这是索引点至)。如果您在很多块中有很多匹配,那么查询就会变慢。



这里是一个例子

  In [5]:N = 100000000 

在[6]中:df = DataFrame(np.random.randn(N,3),columns = ['A', 'B','C'])

在[7]中:df ['c_id'] = np.random.randint(0,10,size = N)

[8]:df ['f_id'] = np.random.randint(0,10,size = N)

在[9]中:df.index = date_range('20130101',在[10]中:df.to_hdf('test2.h5','df',mode ='w',data_columns = ['c_id', 'f_id'])

在[11]:df.head()
Out [11]:
ABC c_id f_id
2013-01-01 00: 00:00 0.037287 1.153534 0.639669 8 7
2013-01-01 00:00:01 1.741046 0.459821 0.194282 8 3
2013-01-01 00:00:02 -2.273919 -0.141789 0.770567 1 1
2013-01-01 00:00:03 0.320879 -0.108426 -1.310302 8 6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362 5 5
2013-01-01 00:00:05 1.608211 0.069196 0.025021 3 6
2013-01-01 00:00:06 -0.561690 0.613579 1.071438 8 2
2013-01-01 00:00:07 1.795043 -0.661966 1.210714 0 0
2013-01-01 00:00:08 0.176347 -0.461176 1.624514 3 6
2013-01-01 00:00:09 -1.084537 1.941610 -1.423559 9 1
2013-01-01 00:00:10 -0.101036 0.925010 -0.809951 0 9
2013-01-01 00:00:11 -1.185520 0.968519 2.871983 7 5
2013-01 -01 00:00:12 -1.089267 -0.333969 -0.665014 3 6
2013-01-01 00:00:13 0.544427 0.130439 0.423749 5 7
2013-01-01 00:00:14 0.112216 0.404801 -0.061730 5 4
2013-01-01 00:00:15 -1.349838 -0.639435 0.993495 0 9


在[2]中:%timeit pd.read_hdf('test2 .h5','df',其中=f_id = [1]& c_id = [2])
1个循环,最好是3:每循环13.9 s

在[3]中:%timeit pd.read_hdf('test2.h5','df' ,其中=f_id = [1,2]& c_id = [1,2])
1个循环,最好是3:每个循环21.2秒

在[4]中: %timeit pd.read_hdf('test.2h5','df',where =f_id = [1,2,3]& c_id = [1,2,3])
1个循环,最好的3:每循环42.8秒

这个特殊的例子是5GB未压缩和2.9GB压缩。在这种情况下,使用未压缩的数据实际上要快得多(例如,第一次循环的时间为3.5s),这是100MM的行数。

最后一个例子(4),您在查询时间的3倍以上获得了第一个数据的9倍。



然而,您的加速应该更多,因为您赢了我们不会选择个别时间戳,而是稍后再做。



整个方法考虑到您有足够的内存将结果保存在批次中(例如,您正在批次查询中选择相对较小的一部分)。


I have a large (~160 million rows) dataframe that I've stored to disk with something like this:

    def fillStore(store, tablename):
        files = glob.glob('201312*.csv')
        names = ["ts", "c_id", "f_id","resp_id","resp_len", "s_id"]
        for f in files:
            df = pd.read_csv(f, parse_dates=True, index_col=0, names=names)
            store.append(tablename, df, format='table', data_columns=['c_id','f_id'])

The table has a time index and I will query using c_id and f_id in addition to times (via the index).

I have another dataframe containing ~18000 "incidents." Each incident consists of some (as few as hundreds, as many as hundreds of thousands) individual records. I need to collect some simple statistics for each incident and store them in order to collect some aggregate statistics. Currently I do this like so:

def makeQueryString(c, f, start, stop):
    return "c_id == {} & f_id == {} & index >= Timestamp('{}') & index < Timestamp('{}')".format(c, f , str(pd.to_datetime(start)),str(pd.to_datetime(stop)))

def getIncidents(inc_times, store, tablename):
    incidents = pd.DataFrame(columns = ['c_id','f_id','resp_id','resp_len','s_id','incident_id'])
    for ind, row in inc_times.iterrows():
        incidents = incidents.append(store.select(tablename, 
                                                  makeQueryString(row.c_id, 
                                                                  row.f_id, 
                                                                  row.start, 
                                                                  row.stop))).fillna(ind)
    return incidents

This all works fine except for the fact that each store.select() statement takes roughly 5 seconds which means that processing the full month's worth of data requires somewhere between 24-30 hours of processing. Meanwhile, the actual statistics I need are relatively simple:

def getIncidentStats(df):
    incLen = (df.index[-1]-df.index[0]).total_seconds()
    if incLen == 0:
        incLen = .1
    rqsts = len(df)
    rqstRate_s = rqsts/incLen
    return pd.Series({'c_id':df.c_id[0],
                      'f_id':df.fqdn_id[0],
                      'Length_sec':incLen, 
                      'num_rqsts':rqsts, 
                      'rqst_rate':rqstRate_s, 
                      'avg_resp_size':df.response_len.mean(), 
                      'std_resp_size':df.response_len.std()})


incs = getIncidents(i_times, store, tablename)
inc_groups = incs.groupby('incident_id')
inc_stats = inc_groups.apply(getIncidentStats)

My question is: how can I improve the performance or efficiency of any part of this work flow? (Please note that I actually batch most of the jobs to get and store incidents one day at a time simply because I want to limit the risk of losing already processed data in the even of a crash. I left this code out here for simplicity and because I actually need to process the whole month's data.)

Is there a way to process the data as I receive it from the store and is there any benefit to this? Would I benefit from using store.select_as_index? If I receive an index I'd still need to access the data to get the statistics correct?

Other notes/questions: I have compared the performance of storing my HDFStore on both a SSD and normal hard drive and didn't notice any improvement for the SSD. Is this expected?

I also toyed with the idea of creating a large conjunction of query strings and asking for them all at once. This causes memory errors when the total query string is too large (~5-10 queries).

Edit 1 If it matters, I am using tables version 3.1.0 and pandas version 0.13.1

Edit 2 Here is some more information:

ptdump -av store.h5
/ (RootGroup) ''
  /._v_attrs (AttributeSet), 4 attributes:
   [CLASS := 'GROUP',
    PYTABLES_FORMAT_VERSION := '2.0',
    TITLE := '',
    VERSION := '1.0']
/all_recs (Group) ''
  /all_recs._v_attrs (AttributeSet), 14 attributes:
   [CLASS := 'GROUP',
    TITLE := '',
    VERSION := '1.0',
    data_columns := ['c_id', 'f_id'],
    encoding := None,
    index_cols := [(0, 'index')],
    info := {1: {'type': 'Index', 'names': [None]}, 'index': {'index_name': 'ts'}},
    levels := 1,
    nan_rep := 'nan',
    non_index_axes := [(1, ['c_id', 'f_id', 'resp_id', 'resp_len', 'dns_server_id'])],
    pandas_type := 'frame_table',
    pandas_version := '0.10.1',
    table_type := 'appendable_frame',
    values_cols := ['values_block_0', 'c_id', 'f_id']]
/all_recs/table (Table(161738653,)) ''
  description := {
  "index": Int64Col(shape=(), dflt=0, pos=0),
  "values_block_0": Int64Col(shape=(3,), dflt=0, pos=1),
  "c_id": Int64Col(shape=(), dflt=0, pos=2),
  "f_id": Int64Col(shape=(), dflt=0, pos=3)}
  byteorder := 'little'
  chunkshape := (5461,)
  autoindex := True
  colindexes := {
    "index": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "f_id": Index(6, medium, shuffle, zlib(1)).is_csi=False,
    "c_id": Index(6, medium, shuffle, zlib(1)).is_csi=False}
  /all_recs/table._v_attrs (AttributeSet), 19 attributes:
   [CLASS := 'TABLE',
    FIELD_0_FILL := 0,
    FIELD_0_NAME := 'index',
    FIELD_1_FILL := 0,
    FIELD_1_NAME := 'values_block_0',
    FIELD_2_FILL := 0,
    FIELD_2_NAME := 'c_id',
    FIELD_3_FILL := 0,
    FIELD_3_NAME := 'f_id',
    NROWS := 161738653,
    TITLE := '',
    VERSION := '2.6',
    client_id_dtype := 'int64',
    client_id_kind := ['c_id'],
    fqdn_id_dtype := 'int64',
    fqdn_id_kind := ['f_id'],
    index_kind := 'datetime64',
    values_block_0_dtype := 'int64',
    values_block_0_kind := ['s_id', 'resp_len', 'resp_id']]

Here are samples of both the main table and inc_times:

In [12]: df.head()
Out[12]: 
                          c_id        f_id          resp_id      resp_len  \
ts                                                                   
2013-12-04 08:00:00  637092486  5372764353               30      56767543   
2013-12-04 08:00:01  637092486  5399580619               23      61605423   
2013-12-04 08:00:04    5456242  5385485460               21      46742687   
2013-12-04 08:00:04    5456242  5385485460               21      49909681   
2013-12-04 08:00:04  624791800  5373236646               14      70461449   

                              s_id  
ts                           
2013-12-04 08:00:00           1829  
2013-12-04 08:00:01           1724  
2013-12-04 08:00:04           1679  
2013-12-04 08:00:04           1874  
2013-12-04 08:00:04           1727  

[5 rows x 5 columns]


In [13]: inc_times.head()
Out[13]: 
        c_id     f_id                start                 stop
0       7254   196211  1385880945000000000  1385880960000000000
1       9286   196211  1387259840000000000  1387259850000000000
2      16032   196211  1387743730000000000  1387743735000000000
3      19793   196211  1386208175000000000  1386208200000000000
4      19793   196211  1386211800000000000  1386211810000000000

[5 rows x 4 columns]

Regarding c_id and f_id, the set of IDs I want to select from the full store is relatively few compared to the total number of IDs in the store. In other words, there are some popular IDs in inc_times that I will repeatedly query while completely ignoring some of the IDs that exist in the full table. I'd estimate that the Ids I care about are roughly 10% of the total IDs, but that these are the most popular IDs so their records dominate the full set.

I have 16GB RAM. The full store is 7.4G and the full dataset (as a csv file) is only 8.7 GB. Initially I believed I would be able to load the whole thing in memory and at least do some limited operations on it, but I get memory errors on loading the whole thing. Hence, batching it into daily files (the full file consists of data for one month).

解决方案

Here's some recommendations and a similar question is here

Use compression: see here. You should try this (this could make it faster / slower depending on exactly what you are querying), YMMV.

ptrepack --chunkshape=auto --propindexes --complevel=9 --complib=blosc in.h5 out.h5

Use a hierarchical query in chunks. What I mean is this. Since you have a relatively small number of c_id and f_id that you care about, structure a single query something like this. This is kind of like using isin.

f_ids = list_of_f_ids that I care about
c_ids = list_of_c_ids that I care about

def create_batches(l, maxn=32):
    """ create a list of batches, maxed at maxn """
    batches = []
    while(True):
        if len(l) <= maxn:
            if len(l) > 0:
                batches.append(l)
            break
        batches.append(l[0:maxn])
        l = l[maxn:]
    return batches


results = []
for f_id_batch in create_batches(f_id_list):

    for c_id_batch in create_batches(c_id_list):

        q = "f_id={f_id} & c_id={c_id}".format(
                f_id=f_id_batch,
                c_id=c_id_batch)

        # you can include the max/min times in here as well (they would be max/min
        # time for ALL the included batches though, maybe easy for you to compute

        result = store.select('df',where=q)

        # sub process this result

        def f(x):
            # you will need to filter out the min/max timestamps here (which I gather
            # are somewhat dependent on f_id/c_id group

            #### process the data and return something
            # you could do something like: ``return x.describe()`` for simple stats

         results.append(result.groupby(['f_id','c_id').apply(f))

results = pd.concat(results)

The key here is to process so that the isin DOES not have more that 32 members for any variable that you are querying on. This is an internal numpy/pytables limitation. If you exceed this, the query will work, but it will drop that variable and do a reindex on ALL the data (which is NOT what you want here).

This way you will have a nice subset of data in memory over just a few loops. These queries I think would take about the same time as most of your queries or so, but you will have way fewer.

The query time is roughly constant for a given subset (unless the data is ordered such that it it is completely indexed).

So the query scans 'blocks' of data (which is what the indexes point to). If you have lots of hits across many blocks then the query is slower.

Here's an example

In [5]: N = 100000000

In [6]: df = DataFrame(np.random.randn(N,3),columns=['A','B','C'])

In [7]: df['c_id'] = np.random.randint(0,10,size=N)

In [8]: df['f_id'] = np.random.randint(0,10,size=N)

In [9]: df.index = date_range('20130101',periods=N,freq='s')

In [10]: df.to_hdf('test2.h5','df',mode='w',data_columns=['c_id','f_id'])

In [11]: df.head()
Out[11]: 
                            A         B         C  c_id  f_id
2013-01-01 00:00:00  0.037287  1.153534  0.639669     8     7
2013-01-01 00:00:01  1.741046  0.459821  0.194282     8     3
2013-01-01 00:00:02 -2.273919 -0.141789  0.770567     1     1
2013-01-01 00:00:03  0.320879 -0.108426 -1.310302     8     6
2013-01-01 00:00:04 -1.445810 -0.777090 -0.148362     5     5
2013-01-01 00:00:05  1.608211  0.069196  0.025021     3     6
2013-01-01 00:00:06 -0.561690  0.613579  1.071438     8     2
2013-01-01 00:00:07  1.795043 -0.661966  1.210714     0     0
2013-01-01 00:00:08  0.176347 -0.461176  1.624514     3     6
2013-01-01 00:00:09 -1.084537  1.941610 -1.423559     9     1
2013-01-01 00:00:10 -0.101036  0.925010 -0.809951     0     9
2013-01-01 00:00:11 -1.185520  0.968519  2.871983     7     5
2013-01-01 00:00:12 -1.089267 -0.333969 -0.665014     3     6
2013-01-01 00:00:13  0.544427  0.130439  0.423749     5     7
2013-01-01 00:00:14  0.112216  0.404801 -0.061730     5     4
2013-01-01 00:00:15 -1.349838 -0.639435  0.993495     0     9


In [2]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1] & c_id=[2]")
1 loops, best of 3: 13.9 s per loop

In [3]: %timeit pd.read_hdf('test2.h5','df',where="f_id=[1,2] & c_id=[1,2]")
1 loops, best of 3: 21.2 s per loop

In [4]: %timeit pd.read_hdf('test.2h5','df',where="f_id=[1,2,3] & c_id=[1,2,3]")
1 loops, best of 3: 42.8 s per loop

This particular example is 5GB uncompressed and 2.9GB compressed. These results are on the compressed data. In THIS case it is actually quite a bit faster to use the uncompressed (e.g. the first loop taked 3.5s). This is 100MM rows.

So using the last example (4) you are getting 9x the data of the first in a little over 3x the query time.

However your speedup should be MUCH more, because you won't be selecting on individual timestamps, rather doing that later.

This whole approach takes into account that you have enough main memory to hold your results in the batch sizes (e.g. you are selecting a relatively small part of the set in the batch queries).

这篇关于利用 pandas 提高大型HDFStore表的查询性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆