pydata blaze:是否允许并行处理? [英] pydata blaze: does it allow parallel processing or not?

查看:80
本文介绍了pydata blaze:是否允许并行处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻求并行化numpy或pandas操作.为此,我一直在研究pydata的火焰.我的理解是,并行化是它的主要卖点.

I am looking to parallelise numpy or pandas operations. For this I have been looking into pydata's blaze. My understanding was that seemless parallelisation was its major selling point.

不幸的是,我无法找到在多个内核上运行的操作.火焰中的并行处理是否可用或当前仅是既定目标?难道我做错了什么?我正在使用blaze v0.6.5.

Unfortunately I have been unable to find an operation that runs on more than one core. Is parallel processing in blaze available yet or currently only a stated aim? Am I doing something wrong? I am using blaze v0.6.5.

我希望并行化的一个函数示例:(对pytables列进行重复数据删除,使其无法容纳在内存中)

Example of one function I was hoping to parallelise: (deduplication of a pytables column too large to fit in memory)

import pandas as pd
import blaze as bz
def f1():
    counter = 0
    groups = pd.DataFrame(columns=['name'])
    t = bz.TableSymbol('t', '{name: string}')
    e = bz.distinct(t)
    for chunk in store.select('my_names', columns=['name'],
                              chunksize=1e5):
        counter += 1
        print('processing chunk %d' % counter)
        groups = pd.concat([groups, chunk])
        groups = bz.compute(e, groups)


编辑1

按照Phillip的例子,我遇到了问题:


Edit 1

I have had problems following Phillip's examples:

In [1]: from blaze import Data, compute

In [2]: d = Data('test.bcolz')

In [3]: d.head(5)
Out[3]: <repr(<blaze.expr.collections.Head at 0x7b5e300>) failed: NotImplementedError: Don't know how to compute:
expr: _1.head(5).head(11)
data: {_1: ctable((8769257,), [('index', '<i8'), ('date', 'S10'), ('accessDate', 'S26')])
  nbytes: 367.97 MB; cbytes: 35.65 MB; ratio: 10.32
  cparams := cparams(clevel=5, shuffle=True, cname='blosclz')
  rootdir := 'test.bcolz'
[(0L, '2014-12-12', '2014-12-14T17:39:19.716000')
 (1L, '2014-12-11', '2014-12-14T17:39:19.716000')
 (2L, '2014-12-10', '2014-12-14T17:39:19.716000') ...,
 (1767L, '2009-11-11', '2014-12-15T13:32:39.906000')
 (1768L, '2009-11-10', '2014-12-15T13:32:39.906000')
 (1769L, '2009-11-09', '2014-12-15T13:32:39.906000')]}>

我的环境:

C:\Anaconda>conda list blaze
# packages in environment at C:\Anaconda:
#
blaze                     0.6.8               np19py27_69

但是请注意,大火似乎报告了错误的版本:

But note, blaze seems to report a wrong version:

In [5]: import blaze

In [6]: blaze.__version__
Out[6]: '0.6.7'

与其他数据源一起使用似乎很有效:

With other data sources blaze seems to work:

In [6]: d = Data([1,2,2,2,3,4,4,4,5,6])

In [7]: d.head(5)
Out[7]:
   _2
0   1
1   2
2   2
3   2
4   3

In [16]: list(compute(d._2.distinct()))
Out[16]: [1, 2, 3, 4, 5, 6]

推荐答案

注意:以下示例要求使用最新版本的blaze,您可以通过以下方式获取

Note: The example below requires the latest version of blaze, which you can get via

conda install -c blaze blaze

您还需要最新的 into 项目.您需要从master安装into,您可以使用

You'll also need the latest version of the nascent into project. You'll need to install into from master, which you can do with

pip install git+git://github.com/ContinuumIO/into.git

您不能使用任意后端进行无缝"并行化,但是 bcolz 后端以很好的方式支持并行化.这是 NYC出租车行程/票价数据集

You can't do "seamless" parallelization with an arbitrary backend, but the bcolz backend supports parallelization in a nice way. Here's an example with the NYC Taxi trip/fare dataset

注意:我将行程和票价数据集都合并到一个数据集中.数据集中有173,179,759行

Note: I've combined both the trip and fare datasets into a single dataset. There are 173,179,759 rows in the dataset

In [28]: from blaze import Data, compute

In [29]: ls -d *.bcolz
all.bcolz/  fare.bcolz/ trip.bcolz/

In [30]: d = Data('all.bcolz')

In [31]: d.head(5)
Out[31]:
                          medallion                      hack_license  \
0  89D227B655E5C82AECF13C3F540D4CF4  BA96DE419E711691B9445D6A6307C170
1  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
2  0BD7C8F5BA12B88E0B67BED28BEA73D8  9FD8F69F0804BDB5549F40E9DA1BE472
3  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310
4  DFD2202EE08F7A8DC9A57B02ACB81FE2  51EE87E3205C985EF8431D850C786310

  vendor_id  rate_code store_and_fwd_flag     pickup_datetime  \
0       CMT          1                  N 2013-01-01 15:11:48
1       CMT          1                  N 2013-01-06 00:18:35
2       CMT          1                  N 2013-01-05 18:49:41
3       CMT          1                  N 2013-01-07 23:54:15
4       CMT          1                  N 2013-01-07 23:25:03

     dropoff_datetime  passenger_count  trip_time_in_secs  trip_distance  \
0 2013-01-01 15:18:10                4                382            1.0
1 2013-01-06 00:22:54                1                259            1.5
2 2013-01-05 18:54:23                1                282            1.1
3 2013-01-07 23:58:20                2                244            0.7
4 2013-01-07 23:34:24                1                560            2.1

     ...     pickup_latitude  dropoff_longitude  dropoff_latitude  \
0    ...           40.757977         -73.989838         40.751171
1    ...           40.731781         -73.994499         40.750660
2    ...           40.737770         -74.009834         40.726002
3    ...           40.759945         -73.984734         40.759388
4    ...           40.748528         -74.002586         40.747868

   tolls_amount  tip_amount  total_amount  mta_tax  fare_amount  payment_type  \
0             0           0           7.0      0.5          6.5           CSH
1             0           0           7.0      0.5          6.0           CSH
2             0           0           7.0      0.5          5.5           CSH
3             0           0           6.0      0.5          5.0           CSH
4             0           0          10.5      0.5          9.5           CSH

  surcharge
0       0.0
1       0.5
2       1.0
3       0.5
4       0.5

[5 rows x 21 columns]

要添加基于进程的并行性,我们从multiprocessing stdlib模块引入Pool类,并将Pool实例的map方法作为关键字参数传递给compute:

To add process-based parallelism, we bring in the Pool class from the multiprocessing stdlib module, and pass the Pool instance's map method as a keyword argument to compute:

In [32]: from multiprocessing import Pool

In [33]: p = Pool()

In [34]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct())
1 loops, best of 1: 1min per loop

In [35]: %timeit -n 1 -r 1 values = compute(trip.medallion.distinct(), map=p.map)
1 loops, best of 1: 16.2 s per loop

因此,额外的一行代码的速度大约提高了3倍.请注意,这是一个字符串列,与其他类型相比,它们往往效率很低.在一个整数列上计算的distinct表达式在大约1秒(vs 3秒)内完成,并具有多个内核(因此,运行时间的改善大致相同):

So, roughly a 3x speedup for an extra line of code. Note that this is a string column, and these tend to be very inefficient compared to other types. An distinct expression computed over an integer column is finished in about 1 second (vs 3 seconds) with multiple cores (so, about the same improvement in running time):

In [38]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct())
1 loops, best of 1: 3.33 s per loop

In [39]: %timeit -n 1 -r 1 values = compute(trip.passenger_count.distinct(), map=p.map)
1 loops, best of 1: 1.01 s per loop

这篇关于pydata blaze:是否允许并行处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆