如何在 Python 中对循环内的操作进行多线程处理 [英] How to Multi-thread an Operation Within a Loop in Python

查看:340
本文介绍了如何在 Python 中对循环内的操作进行多线程处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个非常大的列表,我正在执行这样的操作:

Say I have a very large list and I'm performing an operation like so:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

我的问题有两个方面:

  • 有很多东西
  • api.my_operation 需要永远返回

我想使用多线程一次启动一堆 api.my_operations,这样我就可以一次处理 5、10 甚至 100 个项目.

I'd like to use multi-threading to spin up a bunch of api.my_operations at once so I can process maybe 5 or 10 or even 100 items at once.

如果 my_operation() 返回异常(因为我可能已经处理了该项目) - 没关系.它不会破坏任何东西.循环可以继续到下一项.

If my_operation() returns an exception (because maybe I already processed that item) - that's OK. It won't break anything. The loop can continue to the next item.

注意:这是针对 Python 2.7.3

Note: this is for Python 2.7.3

推荐答案

首先,在 Python 中,如果您的代码受 CPU 限制,多线程将无济于事,因为只有一个线程可以持有全局解释器锁,因此运行Python 代码,一次.因此,您需要使用进程,而不是线程.

First, in Python, if your code is CPU-bound, multithreading won't help, because only one thread can hold the Global Interpreter Lock, and therefore run Python code, at a time. So, you need to use processes, not threads.

如果您的操作永远需要返回",这不是真的,因为它是 IO 绑定的——也就是说,等待网络或磁盘副本等.稍后我会回到这个话题.

This is not true if your operation "takes forever to return" because it's IO-bound—that is, waiting on the network or disk copies or the like. I'll come back to that later.

接下来,一次处理 5、10 或 100 个项目的方法是创建一个由 5、10 或 100 个工作人员组成的池,并将这些项目放入工作人员服务的队列中.幸运的是,stdlib multiprocessingconcurrent.futures 库都为您提供了大部分细节.

Next, the way to process 5 or 10 or 100 items at once is to create a pool of 5 or 10 or 100 workers, and put the items into a queue that the workers service. Fortunately, the stdlib multiprocessing and concurrent.futures libraries both wraps up most of the details for you.

前者对于传统编程更强大、更灵活;如果您需要编写未来等待,后者更简单;对于琐碎的情况,您选择哪个并不重要.(在这种情况下,最明显的每个实现需要 3 行 futures,4 行 multiprocessing.)

The former is more powerful and flexible for traditional programming; the latter is simpler if you need to compose future-waiting; for trivial cases, it really doesn't matter which you choose. (In this case, the most obvious implementation with each takes 3 lines with futures, 4 lines with multiprocessing.)

如果您使用的是 2.6-2.7 或 3.0-3.1,futures 不是内置的,但您可以从 PyPI(pip install futures).

If you're using 2.6-2.7 or 3.0-3.1, futures isn't built in, but you can install it from PyPI (pip install futures).

最后,如果您可以将整个循环迭代转换为函数调用(例如,您可以传递给 map),那么并行化通常会简单得多,所以让我们先这样做:

Finally, it's usually a lot simpler to parallelize things if you can turn the entire loop iteration into a function call (something you could, e.g., pass to map), so let's do that first:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

<小时>

综合起来:


Putting it all together:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

<小时>

如果您有很多相对较小的作业,多处理的开销可能会淹没收益.解决这个问题的方法是将工作分成更大的工作.例如(使用 itertools 食谱,您可以将其复制并粘贴到您的代码中,或者从 PyPI 上的 more-itertools 项目中获取):


If you have lots of relatively small jobs, the overhead of multiprocessing might swamp the gains. The way to solve that is to batch up the work into larger jobs. For example (using grouper from the itertools recipes, which you can copy and paste into your code, or get from the more-itertools project on PyPI):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

<小时>

最后,如果您的代码是 IO 绑定的怎么办?然后线程和进程一样好,并且开销更少(并且限制更少,但在这种情况下,这些限制通常不会影响您).有时,更少的开销"足以意味着您不需要使用线程进行批处理,但您需要使用进程进行批处理,这是一个不错的胜利.


Finally, what if your code is IO bound? Then threads are just as good as processes, and with less overhead (and fewer limitations, but those limitations usually won't affect you in cases like this). Sometimes that "less overhead" is enough to mean you don't need batching with threads, but you do with processes, which is a nice win.

那么,如何使用线程而不是进程?只需将 ProcessPoolExecutor 更改为 ThreadPoolExecutor.

So, how do you use threads instead of processes? Just change ProcessPoolExecutor to ThreadPoolExecutor.

如果您不确定您的代码是受 CPU 限制还是受 IO 限制,请两种方式都尝试.

If you're not sure whether your code is CPU-bound or IO-bound, just try it both ways.

我可以在我的 Python 脚本中为多个函数执行此操作吗?例如,如果我想并行化的代码中的其他地方有另一个 for 循环.是否可以在同一个脚本中执行两个多线程函数?

Can I do this for multiple functions in my python script? For example, if I had another for loop elsewhere in the code that I wanted to parallelize. Is it possible to do two multi threaded functions in the same script?

是的.事实上,有两种不同的方法可以做到这一点.

Yes. In fact, there are two different ways to do it.

首先,您可以共享同一个(线程或进程)执行器并在多个地方使用它没有问题.任务和未来的全部意义在于它们是独立的;你不在乎他们跑在哪里,只要你把他们排好队,最终得到答案.

First, you can share the same (thread or process) executor and use it from multiple places with no problem. The whole point of tasks and futures is that they're self-contained; you don't care where they run, just that you queue them up and eventually get the answer back.

或者,您可以在同一个程序中有两个执行程序,没有问题.这会带来性能成本——如果您同时使用两个执行程序,您最终将尝试在 8 个内核上运行(例如)16 个繁忙线程,这意味着将进行一些上下文切换.但有时这样做是值得的,因为,比如说,两个 executor 很少同时忙,它使您的代码更简单.或者,一个执行器可能正在运行可能需要一段时间才能完成的非常大的任务,而另一个执行器正在运行需要尽快完成的非常小的任务,因为对于您的部分程序而言,响应能力比吞吐量更重要.

Alternatively, you can have two executors in the same program with no problem. This has a performance cost—if you're using both executors at the same time, you'll end up trying to run (for example) 16 busy threads on 8 cores, which means there's going to be some context switching. But sometimes it's worth doing because, say, the two executors are rarely busy at the same time, and it makes your code a lot simpler. Or maybe one executor is running very large tasks that can take a while to complete, and the other is running very small tasks that need to complete as quickly as possible, because responsiveness is more important than throughput for part of your program.

如果您不知道哪个适合您的程序,通常是第一个.

If you don't know which is appropriate for your program, usually it's the first.

这篇关于如何在 Python 中对循环内的操作进行多线程处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆