如何使用dask.dataframe有效地写入多个CSV文件? [英] How should I write multiple CSV files efficiently using dask.dataframe?

查看:155
本文介绍了如何使用dask.dataframe有效地写入多个CSV文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我在做什么的摘要:

Here is the summary of what I'm doing:

首先,我是通过普通的多处理程序和pandas软件包来完成此操作的:

At first, I do this by normal multiprocessing and pandas package:

import os    
files = os.listdir(DATA_PATH + product)

第2步.遍历列表

来自多处理导入池的

from multiprocessing import Pool
import pandas as pd    

def readAndWriteCsvFiles(file):
    ### Step 2.1 read csv file into dataframe 
    data = pd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False)

    ### Step 2.2 do some calculation
    ### .......

    ### Step 2.3 write the dataframe to csv to another folder
    data.to_csv("another folder/"+file)

if __name__ == '__main__':
    cl = Pool(4)
    cl.map(readAndWriteCsvFiles, files, chunksize=1)
    cl.close()
    cl.join()  

该代码可以正常运行,但是非常慢.

The code works fine, but it's very slow.

完成任务大约需要1000秒.

It needs about 1000 second to do the task.

使用 library(parallel) parSapply 函数与R程序进行比较.

Compare to R programme using library(parallel) and parSapply function.

R程序仅需约160秒.

The R programme only takes about 160 seconds.

因此,我尝试使用以下代码对dask.delayed和dask.dataframe进行尝试:

So then I tried with dask.delayed and dask.dataframe with following code:

import os    
files = os.listdir(DATA_PATH + product)

第2步.遍历列表

从dask.

from dask.delayed import delayed
import dask.dataframe as dd
from dask import compute

def readAndWriteCsvFiles(file):
    ### Step 2.1 read csv file into dataframe 
    data = dd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False, assume_missing=True)

    ### Step 2.2 do some calculation
    ### .......

    ### Step 2.3 write the dataframe to csv to another folder
    data.to_csv(filename="another folder/*", name_function=lambda x: file)

compute([delayed(readAndWriteCsvFiles)(file) for file in files])

这一次,我发现如果我同时注释了dask代码和pandas代码中的第2.3步,则dask的运行速度将比普通pandas和multiprocessing更快.

This time, I found if I commented out both step 2.3 in dask code and pandas code, dask would run way more faster then normal pandas and multiprocessing.

但是,如果我调用to_csv方法,那么dask的速度与熊猫一样慢.

But if I invoke the to_csv method, then dask is as slow as pandas.

有解决方案吗?

谢谢

推荐答案

读写CSV文件通常受GIL约束.您可能想尝试与进程而不是与线程并行化(dask的默认延迟).

Reading and writing CSV files is often bound by the GIL. You might want to try parallelizing with processes rather than with threads (the default for dask delayed).

您可以通过在计算调用中添加 scheduler ='processes'关键字来实现此目的.

You can achieve this by adding the scheduler='processes' keyword to your compute call.

compute([delayed(readAndWriteCsvFiles)(file) for file in files], scheduler='processes')

有关更多信息,请参见计划文档

See scheduling documentation for more information

此外,请注意,您此处未使用dask.dataframe,而是使用了dask.delayed.

Also, note that you're not using dask.dataframe here, but rather dask.delayed.

这篇关于如何使用dask.dataframe有效地写入多个CSV文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆