适用于DataFrame操作/功能的Python多重处理 [英] Python Multiprocessing for DataFrame Operations/Functions
问题描述
我正在使用Pandas Dataframe处理100,000行文本数据.我选择删除的行经常出现错误(每100,000小于5).错误处理功能如下:
I am processing 100,000s of rows of text data using Pandas Dataframes. Every so often (<5 per 100,000) I have an error for a row that I have chosen to drop. The error handling function is as follows:
def unicodeHandle(datai):
for i, row in enumerate(datai['LDTEXT']):
print(i)
#print(text)
try:
text = row.read()
text.strip().split('[\W_]+')
print(text)
except UnicodeDecodeError as e:
datai.drop(i, inplace=True)
print('Error at index {}: {!r}'.format(i, row))
print(e)
return datai
该功能运行良好,我已经使用了几个星期.
The function works fine, and I have been using it a few weeks.
问题在于,我永远不知道何时会出现错误,因为数据来自不断添加到其中的DB(或者我可能提取其他数据).重要的是,我必须遍历每一行以运行我的错误测试功能unicodeHandle
以便初始化我的数据.这个过程大约需要5分钟,这有点烦人.我正在尝试实现多处理以加快循环速度.通过网络和各种教程,我得出了:
The problem is that I never know when the error will occur as the data comes from a DB that is constantly being added to (or I may pull different data). Point being, I must iterate through every row to run my error test function unicodeHandle
in order initialize my data. This process takes about ~5 minutes which gets a little annoying. I am trying to implement multiprocessing to speed up the loop. Via the web and various tutorials, I have come up with:
def unicodeMP(datai):
chunks = [datai[i::8] for i in range(8)]
pool = mp.Pool(processes=8)
results = pool.apply_async(unicodeHandle, chunks)
while not results.ready():
print("One Sec")
return results.get()
if __name__ == "__main__":
fast = unicodeMP(datai)
当我运行它进行多处理时,即使我的CPU说它正在以更高的利用率运行,它也需要花费与常规时间相同的时间.另外,代码将错误作为正常错误而不是我完整的干净数据框返回.我在这里想念什么?
When I run it the multiprocessing, it takes the same amount of time as regular even through my CPU says it is running at a WAY higher utilization. In addition, the code returns the error as a normal error instead of my completed clean dataframe. What am I missing here?
如何对DataFrame上的函数使用多重处理?
How can I use multiprocessing for functions on DataFrames?
推荐答案
您可以尝试dask对数据帧进行多处理
You can try dask for multiprocessing a dataframe
import dask.dataframe as dd
partitions = 7 # cpu_cores - 1
ddf = dd.from_pandas(df, npartitions=partitions)
ddf.map_partitions(lambda df: df.apply(unicodeHandle).compute(scheduler='processes')
您可以在dask
此处了解更多信息
You can read more about dask
here
这篇关于适用于DataFrame操作/功能的Python多重处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!