在 Python 中优化数据处理的最佳方法 [英] Best way to optimize data processing in Python

查看:65
本文介绍了在 Python 中优化数据处理的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Python 3.8 中有以下数据处理管道:

I have the following data processing pipeline in Python 3.8:

  • 大约 1.3TB 的原始数据存储在 SSD 上,细分为大约 80 个不同的独立类别,进一步细分为单独的 300mb 压缩 csv.gz
  • 3 个主要类,一个将原始数据清理为可读格式,第二个聚合所述数据并进行我需要的数学运算,第三个导入前 2 个,读取每个 csv,运行所有这些进程并保存结果,遍历 csvs.我有约束,因为它是时间序列数据,所以我必须按顺序进行,因为一切都取决于每个类别的先前值(没有向量化).我已经尽可能使用 Cython 和 Numba.

我的电脑有 12 核/24 线程.理想情况下,我希望程序的 24 个实例同时运行,每个线程一个,每个实例尽可能快地从 1 个类别中依次导出数据.

My computer has 12cores/24threads. Ideally, I'd want 24 instances of the program running concurrently, one on each thread, each one exporting data from 1 category sequentially, as fast as possible.

如果我只需要导出,例如,3 个类别,我希望程序在 24 个线程上运行,并且每个实例最多可以使用 8 个线程.

If I only need to export, for example, 3 categories, I'd want the program to run on 24 threads, and each instance could use up to 8 threads.

首先,我制作了一个包含 3 个类的脚本,以及一个运行所有内容的 main.如果我自己运行它,它会成功导出 1 类数据,尽管速度很慢.我们将称之为 script.py.

First, I made a script that contains the 3 classes, and a main that runs everything. If I run this by itself, it'll successfully export 1 category of data, although slowly. We'll call this script.py.

然后,我创建了一个函数(我们将调用 parallelize())来运行 script.py,使用:

Then, I made a function (that we are gonna call parallelize()) that runs the script.py using:

p = subprocess.Popen(mydir/script.py + [myargs], stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, preexec_fn=os.setsid) 
p.wait()

使用这些方法,我尝试了以下方法,但结果都一般:

Using those, I've tried the following methods all with the same mediocre results:

  • joblib.Parallel(n_jobs=mp.cpu_count())(delayed(parallelize)(arg) for args in args).我已经尝试过任何可能的后端,我已经尝试过 n_jobs=mp.cpu_count()/n_categories,我已经尝试使用 parallel_backend 来指定设置.
  • 多处理模块,目标 = 并行化,然后是 p.start()
  • 相同的多处理模块,但目标 = 一个调用 joblib.Parallel(n_jobs=mp.cpu_count()/n_categories)(delayed(parallelize)(arg) for args in args) 的函数

不管我怎么做,结果总是一样:当我启动程序时,所有cpu核心和线程都达到100%,它立即开始导出所有类别,而且速度足够快为了我的需要.即使我只是导出 3 个类别,它也 100% 使用了所有 24 个线程,这表明它很好地利用了多线程.但是仅仅 5-10 分钟后,它突然变慢了,1 个线程仍然以 100% 使用,另外 23 个线程下降到大约 10-20% 使用率,它只处理 1 个类别,如果我去处理我的 Ubuntu 系统监视器我看到所有 python 实例都以 0% 的 CPU 运行,除了 1 运行在 10% 和 16% 之间.

No matter how I try to do it, the result is always the same: when I start up the program, all cpu cores and threads go to 100%, it starts exporting all categories at once, and it does so fast enough for my needs. Even if I'm just exporting 3 categories, it uses all 24 threads at 100%, indicating that it's making good use of the multithreading. BUT after just 5-10 minutes, all of a sudden it slows down, 1 thread remains used at 100%, the other 23 drop to about 10-20% usage, it only processes 1 category, and if I go to the processes on my Ubuntu System Monitor I see all the python instances running at 0% CPU, except for 1 which is running between 10 and 16%.

如果我停止导出(它保存到它到达的点)并恢复它,同样的事情会发生.每 5 分钟运行、停止和重新运行脚本会比让它运行几天要快得多.是什么阻止了我的 CPU 始终以 100% 运行,而不仅仅是前 5 分钟?

If I stop the export (it saves up to the point where it got to) and resume it, same thing happens. It would be paradoxically faster to run, stop and rerun the script every 5 minutes than to just let it run for days. What is stopping my CPU from running at 100% all the time instead of just the first 5 minutes?

我暂时没有在脚本的 3 个类中使用任何异步、线程、多线程或多处理,并且所述脚本中最慢的部分是迭代 csv 行.

I'm not using any async, threading, multithreading or multiprocessing inside my 3 classes in the script for the time being, and the slowest part of said script is iterating the csv rows.

推荐答案

joblib.Parallelmultiprocessing.Pool.map 等函数提供了一种处理列表的简单方法多核/线程上的任务.他们通常只将脚本/计算和可迭代对象作为参数.但是,这两个函数都按他们喜欢的方式分配任务.Pool.map 检查可迭代对象并将其划分为内核/线程数,但大小不一定相等.因此,您最终可能使核心 1 有 100 个任务,而其余核心每个只有 10 个任务.这取决于您的可迭代对象以及两个函数假定的适当拆分.

Functions like joblib.Parallel or multiprocessing.Pool.map offer an easy way to process a list of tasks on multiple cores/threads. They usually take only the script/computation and an iterable as arguments. However, both functions distribute the task as they like. Pool.map checks the iterable and devides it across the number of cores/threads, but not necessarily equal sized. So you could end up with core 1 having 100 tasks, while the rest of your cores only have 10 tasks each. It depends on your iterable and what both functions assume to be an adequate split.

iterable 的拆分甚至可能比任务的实际计算时间花费更多的时间,当 iterable 很大时.有时,由于拆分过程,您在任何任务开始之前就耗尽了内存.

The splitting of the iterable can even take more time than the actual computing time of the task, when the iterable is huge. Sometimes you run out of memory before any task has been started at all, due to the splitting process.

因此,我迁移到始终使用队列并手动进行拆分.这样,您就可以完全控制拆分和消耗的内存,并且可以调试整个过程.

Coming from this, I migrated to always use Queues and do the splitting manually. That way, you have full control of the splitting and the consumed memory and you can debug the whole process.

因此,在您的情况下,它看起来类似于:

So, in your case it would look similar to this:

def script(in_queue, out_queue):
    for task in iter(in_queue.get, 'STOP'):
        # do stuff with your task
        out_queue.put(result)

在你的主线程中:

if __name__ == "__main__":
    in_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    numProc = # number of cores you like
    process = [multiprocessing.Process(target=script,
                      args=(in_queue, out_queue) for x in range(numProc)]
 
    for p in process:
        p.start()

    for category in categories:
        in_queue.put(category)

    for p in process:
        in_queue.put('STOP')

在这个方案中,所有进程都完全相同:从队列中取出任务,进行计算并将结果放入另一个队列.如果您的内核都具有完全相同的速度,则任务将按时间顺序"完成.在一个接一个的核心上,例如:

With this scheme all processes do exactly the same: Take task from a queue, do the computation and put the result in another queue. If your cores would all have exactly the same speed the tasks would be done "chronologically" on one core after the other, like:

task1 -> core1
task2 -> core2
task3 -> core1
task4 -> core2

像您这样的情况,第一个核心 100% 和所有其他核心 10%,只会在最后,几乎所有任务都完成时才会出现.

A situation like yours, 100% on the first core and 10% on all others, would only arise at the very end, when nearly all tasks are done.

这篇关于在 Python 中优化数据处理的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆