读取进程并与dask并行连接pandas数据帧 [英] read process and concatenate pandas dataframe in parallel with dask

查看:405
本文介绍了读取进程并与dask并行连接pandas数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试并行读取和处理 一系列的csv文件,并将输出连接到单个pandas dataframe中以进行进一步处理.

I'm trying to read and process in parallel a list of csv files and concatenate the output in a single pandas dataframe for further processing.

我的工作流程包括3个步骤:

My workflow consist of 3 steps:

  • 通过读取csv文件列表(所有文件都具有相同的结构)来创建一系列pandas数据框

  • create a series of pandas dataframe by reading a list of csv files (all with the same structure)

def loadcsv(filename): df = pd.read_csv(filename) return df

def loadcsv(filename): df = pd.read_csv(filename) return df

通过处理2个现有列为每个数据框创建一个新列

for each dataframe create a new column by processing 2 existing columns

def makegeom(a,b): return 'Point(%s %s)' % (a,b)

def makegeom(a,b): return 'Point(%s %s)' % (a,b)

def applygeom(df): df['Geom']= df.apply(lambda row: makegeom(row['Easting'], row['Northing']), axis=1) return df

def applygeom(df): df['Geom']= df.apply(lambda row: makegeom(row['Easting'], row['Northing']), axis=1) return df

将所有数据帧合并到一个数据帧中

concatenate all the dataframes in a single dataframe

frames = [] for i in csvtest: df = applygeom(loadcsv(i)) frames.append(df) mergedresult1 = pd.concat(frames)

frames = [] for i in csvtest: df = applygeom(loadcsv(i)) frames.append(df) mergedresult1 = pd.concat(frames)

在我的工作流程中,我使用熊猫(每个csv(15)文件都具有>> 2 * 10 ^ 6个以上的数据点),因此需要一段时间才能完成.我认为这种工作流程应利用一些并行处理的优势(至少对于read_csvapply步骤而言),因此我尝试了一下,但我无法正确使用它.在我的尝试中,速度没有任何改善.

In my workflow I use pandas (each csv (15) file has more than >> 2*10^6 data points) so it takes a while to complete. I think this kind of workflow should take advantage of some parallel processing (at least for the read_csv and apply steps) so I gave a try to dask, but I was not able to use it properly. In my attempt I did'n gain any improvement in speed.

我做了一个简单的笔记本,以便复制我在做的事情:

I made a simple notebook so to replicate what I'm doing:

https://gist.github.com/epifanio/72a48ca970a4291b293851ad29eadb50

我的问题是...使用dask完成用例的正确方法是什么?

My question is ... what's the proper way to use dask to accomplish my use case?

推荐答案

熊猫

在熊猫中,我会使用apply方法

Pandas

In Pandas I would use the apply method

In [1]: import pandas as pd

In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})

In [3]: def makegeom(row):
   ...:      a, b = row
   ...:      return 'Point(%s %s)' % (a, b)
   ...: 

In [4]: df.apply(makegeom, axis=1)
Out[4]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)
dtype: object

Dask.dataframe

在dask.dataframe中,您可以做同样的事情

Dask.dataframe

In dask.dataframe you can do the same thing

In [5]: import dask.dataframe as dd

In [6]: ddf = dd.from_pandas(df, npartitions=2)

In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]: 
0    Point(1 3)
1    Point(2 2)
2    Point(3 1)

添加新系列

无论哪种情况,您都可以将新系列添加到数据框中

Add new series

In either case you can then add the new series to the dataframe

df['geom'] = df[['a', 'b']].apply(makegeom)

创建

如果您有CSV数据,则可以使用dask.dataframe.read_csv函数

Create

If you have CSV data then I would use the dask.dataframe.read_csv function

ddf = dd.read_csv('filenames.*.csv')

如果您还有其他类型的数据,则可以使用 dask.delayed

If you have other kinds of data then I would use dask.delayed

这篇关于读取进程并与dask并行连接pandas数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆