如何使用python pandas处理传入的实时数据 [英] How to handle incoming real time data with python pandas

查看:95
本文介绍了如何使用python pandas处理传入的实时数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

哪种方法最推荐/最有效的方式来处理大熊猫的实时传入数据?

Which is the most recommended/pythonic way of handling live incoming data with pandas?

每隔几秒钟,我就会收到以下格式的数据点:

Every few seconds I'm receiving a data point in the format below:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

我想将其附加到现有的DataFrame上,然后对其进行一些分析.

I would like to append it to an existing DataFrame and then run some analysis on it.

问题是,仅将DataFrame.append添加到行中可能会导致所有复制的性能问题.

The problem is, just appending rows with DataFrame.append can lead to performance issues with all that copying.

一些人建议预先分配一个大的DataFrame并在数据输入时对其进行更新:

A few people suggested preallocating a big DataFrame and updating it as data comes in:

In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)

In [2]: columns = ['high', 'low', 'open', 'close']

In [3]: df = pd.DataFrame(index=t, columns=columns)

In [4]: df
Out[4]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02  NaN  NaN  NaN   NaN
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

In [6]: data_ = pd.Series(data)

In [7]: df.loc[data['time']] = data_

In [8]: df
Out[8]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02    4    3    2     1
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

另一种选择是建立字典列表.只需将传入的数据附加到列表中,然后将其切成较小的DataFrame,即可完成工作.

The other alternative is building a list of dicts. Simply appending the incoming data to a list and slicing it into smaller DataFrames to do the work.

In [9]: ls = []

In [10]: for n in range(5):
   .....:     # Naive stuff ahead =)
   .....:     time = '2013-01-01 00:00:0' + str(n)
   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
   .....:     ls.append(d)

In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')

In [12]: df
Out[12]: 
                        close      high       low      open stock
time                                                             
2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH
2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH

或类似的东西,也许还要处理输入一些.

or something like that, maybe processing the input a little bit more.

推荐答案

我将如下使用HDF5/pytables:

I would use HDF5/pytables as follows:

  1. 尽可能长时间"将数据保留为python列表.
  2. 将结果追加到该列表.
  3. 当它变大时:
    • 使用pandas io(和一个可附加的表格)推送到HDF5 Store.
    • 清除列表.
  1. Keep the data as a python list "as long as possible".
  2. Append your results to that list.
  3. When it gets "big":
    • push to HDF5 Store using pandas io (and an appendable table).
    • clear the list.

实际上,我定义的函数为每个键"使用一个列表,以便您可以在同一过程中将多个DataFrame存储到HDF5存储.

In fact, the function I define uses a list for each "key" so that you can store multiple DataFrames to the HDF5 Store in the same process.

我们定义一个您要在每行d中调用的函数:

We define a function which you call with each row d:

CACHE = {}
STORE = 'store.h5'   # Note: another option is to keep the actual file open

def process_row(d, key, max_len=5000, _cache=CACHE):
    """
    Append row d to the store 'key'.

    When the number of items in the key's cache reaches max_len,
    append the list of rows to the HDF5 store and clear the list.

    """
    # keep the rows for each key separate.
    lst = _cache.setdefault(key, [])
    if len(lst) >= max_len:
        store_and_clear(lst, key)
    lst.append(d)

def store_and_clear(lst, key):
    """
    Convert key's cache list to a DataFrame and append that to HDF5.
    """
    df = pd.DataFrame(lst)
    with pd.HDFStore(STORE) as store:
        store.append(key, df)
    lst.clear()

注意:我们使用with语句在每次写入后自动关闭存储. 也许使其保持打开状态更快,但如果这样,建议您定期冲洗(关闭冲洗).另外请注意,使用 collection deque (而不是列表)可能更易读,但是列表的性能在这里会稍好一些.

Note: we use the with statement to automatically close the store after each write. It may be faster to keep it open, but if so it's recommended that you flush regularly (closing flushes). Also note it may be more readable to have used a collections deque rather than a list, but the performance of a list will be slightly better here.

要使用此功能,请致电:

To use this you call as:

process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
            key="df")

注意:"df"是存储的密钥在pytables商店中.

Note: "df" is the stored key used in the pytables store.

作业完成后,请确保您store_and_clear剩余的缓存:

Once the job has finished ensure you store_and_clear the remaining cache:

for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2
    store_and_clear(lst, k)

现在,您可以通过以下方式使用完整的DataFrame:

Now your complete DataFrame is available via:

with pd.HDFStore(STORE) as store:
    df = store["df"]                    # other keys will be store[key]

一些评论:

  • 可以调整5000,尝试一些更小/更大的数字以满足您的需求.
  • 列表附加为O(1),DataFrame附加为O (len(df)).
  • 在您不需要进行统计或数据收集之前,请不要使用大熊猫.
  • 此代码可与多个键(数据点)一起使用.
  • 这是很少的代码,我们停留在香草python列表中,然后是pandas数据框...
  • Some comments:

    • 5000 can be adjusted, try with some smaller/larger numbers to suit your needs.
    • List append is O(1), DataFrame append is O(len(df)).
    • Until you're doing stats or data-munging you don't need pandas, use what's fastest.
    • This code works with multiple key's (data points) coming in.
    • This is very little code, and we're staying in vanilla python list and then pandas dataframe...
    • 此外,要获取最新的读数,您可以定义一个get方法,该方法存储并清除 之前的读数.这样,您将获得最新的数据:

      Additionally, to get the up to date reads you could define a get method which stores and clears before reading. In this way you would get the most up to date data:

      def get_latest(key, _cache=CACHE):
          store_and_clear(_cache[key], key)
          with pd.HDFStore(STORE) as store:
              return store[key]
      

      现在,当您使用以下方式访问时:

      Now when you access with:

      df = get_latest("df")
      

      您将获得最新的"df".

      you'll get the latest "df" available.

      另一个选项稍微涉及更多:在香草pytables中定义自定义表格,请参见

      Another option is slightly more involved: define a custom table in vanilla pytables, see the tutorial.

      注意:您需要知道字段名称才能创建列描述符.

      这篇关于如何使用python pandas处理传入的实时数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆