pandas 中的并行read_table [英] Parallel read_table in pandas
问题描述
有没有办法并行化read_table()的调用?在我的情况下,由于日期解析,它受到了CPU的限制.我看不到通过阅读文档来实现这一目标的任何方法.唯一想到的就是拆分输入文件,并行调用read_table,然后串联数据帧.
Is there a way to parallelize an invocation of read_table()? In my case it's CPU bound due to date parsing. I don't see any way to achieve that from reading the docs. The only thing that comes to mind is splitting the input file, calling read_table in parallel and then concatenating the dataframes.
推荐答案
这将并行读取CSV文件并将其连接起来.令人讨厌的一点是它将无法处理numpy
类型,因此无法解析日期.我一直在努力解决相同的问题,但到目前为止,似乎execnet
之类的库无法处理非内置类型.这就是为什么我在发送之前将DataFrames转换为json
的原因.它将类型剥离为基本的Python类型.
This will read CSV files in parallel and concatenate them. The annoying bit is that it won't handle numpy
types, so it can't parse dates. I have been struggling with the same problem, but so far it seems that libraries such as execnet
can't handle types that are not built-in. That's why I turn DataFrames into json
before sending. It strips the types to basic Python ones.
如果您需要解析日期,也许更明智的方法是远程读取CSV
文件,解析日期并将其另存为pickle
到硬盘驱动器.然后,您可以在主过程中读取pickle文件并将其连接起来.我没有尝试过看看是否会导致性能提高.
If you need to parse dates, maybe a more sensible approach would be to remotely read the CSV
files, parse the dates and save them as pickle
to the hard drive. Then you could read the pickle files in the main process and concatenate them. I haven't tried that to see if it would lead to a gain in performance.
remote_read_csv.py
import cPickle as pickle
if __name__ == '__channelexec__':
reader = pickle.loads(channel.receive())
for filename in channel:
channel.send(reader(filename).to_json())
下面的内容使用上面的模块.我在IPython中对其进行了测试.
This below makes use of the module above. I tested it in IPython.
from pandas import DataFrame, concat, read_csv, read_json
from numpy import random
import execnet
import remote_read_csv
import cPickle as pickle
import itertools
import psutil
### Create dummy data and save to CSV
def rdf():
return DataFrame((random.rand(4, 3) * 100).astype(int))
d1 = rdf()
d2 = rdf()
d3 = rdf()
dfsl = [d1, d2, d3]
names = 'd1.csv d2.csv d3.csv'.split()
for i in range(3):
dfsl[i].to_csv(names[i])
### Read CSV files in separate threads then concatenate
reader = pickle.dumps(read_csv)
def set_gateways(remote_module, *channel_sends):
gateways = []
channels = []
for i in range(psutil.NUM_CPUS):
gateways.append(execnet.makegateway())
channels.append(gateways[i].remote_exec(remote_module))
for send in channel_sends:
channels[i].send(send)
return (gateways, channels)
def para_read(names):
gateways, channels = set_gateways(remote_read_csv, reader)
mch = execnet.MultiChannel(channels)
queue = mch.make_receive_queue()
channel_ring = itertools.cycle(mch)
for f in names:
channel = channel_ring.next()
channel.send(f)
dfs = []
for i in range(len(names)):
channel, df = queue.get()
dfs.append(df)
[gw.exit() for gw in gateways]
return concat([read_json(i) for i in dfs], keys=names)
para_read(names)
这篇关于 pandas 中的并行read_table的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!