并发.未来不并行写 [英] concurrent.futures not parallelizing write

查看:96
本文介绍了并发.未来不并行写的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个列表dataframe_chunk,其中包含一个非常大的pandas数据帧的块,我想将每个块都写入不同的csv中,然后并行执行.但是,我看到文件是按顺序写入的,因此我不确定为什么会这样.这是代码:

I have a list dataframe_chunk which contains chunks of a very large pandas dataframe.I would like to write every single chunk into a different csv, and to do so in parallel. However, I see the files being written sequentially and I'm not sure why this is the case. Here's the code:

import concurrent.futures as cfu

def write_chunk_to_file(chunk, fpath):  
    chunk.to_csv(fpath, sep=',', header=False, index=False)

pool = cfu.ThreadPoolExecutor(N_CORES)

futures = []
for i in range(N_CORES):
    fpath = '/path_to_files_'+str(i)+'.csv'
    futures.append(pool.submit( write_chunk_to_file(dataframe_chunk[i], fpath)))

for f in cfu.as_completed(futures):
    print("finished at ",time.time())

有任何线索吗?

推荐答案

One thing that is stated in the Python 2.7.x threading docs but not in the 3.x docs is that Python cannot achieve true parallelism using the threading library - only one thread will execute at a time.

您应该尝试将concurrent.futures ProcessPoolExecutor 对每个作业使用单独的进程,因此可以在多核CPU上实现真正的并行性.

You should try using concurrent.futures with the ProcessPoolExecutor which uses separate processes for each job and therefore can achieve true parallelism on a multi-core CPU.

更新

以下是您的程序,该程序适合使用multiprocessing库:

Here is your program adapted to use the multiprocessing library instead:

#!/usr/bin/env python3

from multiprocessing import Process

import os
import time

N_CORES = 8

def write_chunk_to_file(chunk, fpath):  
    with open(fpath, "w") as f:
      for x in range(10000000):
        f.write(str(x))

futures = []

print("my pid:", os.getpid())
input("Hit return to start:")

start = time.time()
print("Started at:", start)

for i in range(N_CORES):
    fpath = './tmp/file-'+str(i)+'.csv'
    p = Process(target=write_chunk_to_file, args=(i,fpath))
    futures.append(p)

for p in futures:
  p.start()

print("All jobs started.")

for p in futures:
  p.join()

print("All jobs finished at ",time.time())

您可以在另一个窗口中使用此shell命令监视作业:

You can monitor the jobs with this shell command in another window:

while true; do clear; pstree 12345; ls -l tmp; sleep 1; done

(用脚本发出的pid替换12345.)

(Replace 12345 with the pid emitted by the script.)

这篇关于并发.未来不并行写的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆