并发.未来不并行写 [英] concurrent.futures not parallelizing write
问题描述
我有一个列表dataframe_chunk
,其中包含一个非常大的pandas数据帧的块,我想将每个块都写入不同的csv中,然后并行执行.但是,我看到文件是按顺序写入的,因此我不确定为什么会这样.这是代码:
I have a list dataframe_chunk
which contains chunks of a very large pandas dataframe.I would like to write every single chunk into a different csv, and to do so in parallel. However, I see the files being written sequentially and I'm not sure why this is the case. Here's the code:
import concurrent.futures as cfu
def write_chunk_to_file(chunk, fpath):
chunk.to_csv(fpath, sep=',', header=False, index=False)
pool = cfu.ThreadPoolExecutor(N_CORES)
futures = []
for i in range(N_CORES):
fpath = '/path_to_files_'+str(i)+'.csv'
futures.append(pool.submit( write_chunk_to_file(dataframe_chunk[i], fpath)))
for f in cfu.as_completed(futures):
print("finished at ",time.time())
有任何线索吗?
推荐答案
One thing that is stated in the Python 2.7.x threading
docs
but not in the 3.x docs is that
Python cannot achieve true parallelism using the threading
library - only one thread will execute at a time.
您应该尝试将concurrent.futures
与 ProcessPoolExecutor
对每个作业使用单独的进程,因此可以在多核CPU上实现真正的并行性.
You should try using concurrent.futures
with the ProcessPoolExecutor
which uses separate processes for each job and therefore can achieve true parallelism on a multi-core CPU.
更新
以下是您的程序,该程序适合使用multiprocessing
库:
Here is your program adapted to use the multiprocessing
library instead:
#!/usr/bin/env python3
from multiprocessing import Process
import os
import time
N_CORES = 8
def write_chunk_to_file(chunk, fpath):
with open(fpath, "w") as f:
for x in range(10000000):
f.write(str(x))
futures = []
print("my pid:", os.getpid())
input("Hit return to start:")
start = time.time()
print("Started at:", start)
for i in range(N_CORES):
fpath = './tmp/file-'+str(i)+'.csv'
p = Process(target=write_chunk_to_file, args=(i,fpath))
futures.append(p)
for p in futures:
p.start()
print("All jobs started.")
for p in futures:
p.join()
print("All jobs finished at ",time.time())
您可以在另一个窗口中使用此shell命令监视作业:
You can monitor the jobs with this shell command in another window:
while true; do clear; pstree 12345; ls -l tmp; sleep 1; done
(用脚本发出的pid替换12345.)
(Replace 12345 with the pid emitted by the script.)
这篇关于并发.未来不并行写的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!