pandas 中的并行read_table [英] Parallel read_table in pandas

查看:81
本文介绍了 pandas 中的并行read_table的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法并行化read_table()的调用?在我的情况下,由于日期解析,它受到了CPU的限制.我看不到通过阅读文档来实现这一目标的任何方法.唯一想到的就是拆分输入文件,并行调用read_table,然后串联数据帧.

Is there a way to parallelize an invocation of read_table()? In my case it's CPU bound due to date parsing. I don't see any way to achieve that from reading the docs. The only thing that comes to mind is splitting the input file, calling read_table in parallel and then concatenating the dataframes.

推荐答案

这将并行读取CSV文件并将其连接起来.令人讨厌的一点是它将无法处理numpy类型,因此无法解析日期.我一直在努力解决相同的问题,但到目前为止,似乎execnet之类的库无法处理非内置类型.这就是为什么我在发送之前将DataFrames转换为json的原因.它将类型剥离为基本的Python类型.

This will read CSV files in parallel and concatenate them. The annoying bit is that it won't handle numpy types, so it can't parse dates. I have been struggling with the same problem, but so far it seems that libraries such as execnet can't handle types that are not built-in. That's why I turn DataFrames into json before sending. It strips the types to basic Python ones.

如果您需要解析日期,也许更明智的方法是远程读取CSV文件,解析日期并将其另存为pickle到硬盘驱动器.然后,您可以在主过程中读取pickle文件并将其连接起来.我没有尝试过看看是否会导致性能提高.

If you need to parse dates, maybe a more sensible approach would be to remotely read the CSV files, parse the dates and save them as pickle to the hard drive. Then you could read the pickle files in the main process and concatenate them. I haven't tried that to see if it would lead to a gain in performance.

remote_read_csv.py

import cPickle as pickle

if __name__ == '__channelexec__':
    reader = pickle.loads(channel.receive())

    for filename in channel:
        channel.send(reader(filename).to_json())

下面的内容使用上面的模块.我在IPython中对其进行了测试.

This below makes use of the module above. I tested it in IPython.

from pandas import DataFrame, concat, read_csv, read_json
from numpy import random
import execnet
import remote_read_csv
import cPickle as pickle
import itertools
import psutil

### Create dummy data and save to CSV

def rdf():
    return DataFrame((random.rand(4, 3) * 100).astype(int))

d1 = rdf()
d2 = rdf()
d3 = rdf()

dfsl = [d1, d2, d3]
names = 'd1.csv d2.csv d3.csv'.split()
for i in range(3):
    dfsl[i].to_csv(names[i])

### Read CSV files in separate threads then concatenate

reader = pickle.dumps(read_csv)

def set_gateways(remote_module, *channel_sends):
    gateways = []
    channels = []
    for i in range(psutil.NUM_CPUS):
        gateways.append(execnet.makegateway())
        channels.append(gateways[i].remote_exec(remote_module))
        for send in channel_sends:
            channels[i].send(send)
    return (gateways, channels)

def para_read(names):
    gateways, channels = set_gateways(remote_read_csv, reader)
    mch = execnet.MultiChannel(channels)
    queue = mch.make_receive_queue()
    channel_ring = itertools.cycle(mch)
    for f in names:
        channel = channel_ring.next()
        channel.send(f)
    dfs = []
    for i in range(len(names)):
        channel, df = queue.get()
        dfs.append(df)

    [gw.exit() for gw in gateways]
    return concat([read_json(i) for i in dfs], keys=names)

para_read(names)

这篇关于 pandas 中的并行read_table的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆