Concurrent.futures使用指南-同时使用线程和处理的简单示例 [英] Concurrent.futures usage guide - a simple example of using both threading and processing

查看:69
本文介绍了Concurrent.futures使用指南-同时使用线程和处理的简单示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 concurrent.futures启用程序的并行处理/线程化模块.

不幸的是,我似乎找不到任何使用并发.futures模块的漂亮,简单,防白痴的示例.他们通常需要更高级的python或处理/线程概念和行话知识.

Unfortunately I can't seem to find any nice, simple, idiot-proof examples of using the concurrent.futures module. They typically require more advanced knowledge of python or processing/threading concepts and jargon.

下面是一个基于我的程序的简化的,独立的示例:有一个纯CPU绑定任务非常适合多进程,并且有一个单独的IO绑定任务插入到数据库(SQLite)中. 在我的程序中,我已经将其转换为使用多处理池类,但是由于绑定了CPU的任务的结果都被收集起来等待任务完成,因此它会占用大量内存. 因此,我希望使用线程/处理的组合,我相信current.futures可以很简单地为我做这些事情.

The below is a simplified, self-contained example based on my program: there's a purely CPU bound task ideal for multiprcessing, and a separate IO bound task inserting into a database (SQLite). In my program I've already converted this to use the multiprocessing pool class, but because the results from the CPU bound task are all collected up waiting for the tasks to finish, it uses massive amounts of memory. Thus I'm looking to use a combination of threading/processing which I believe concurrent.futures can do for me fairly simply.

那么如何将以下内容转换为使用该模块的内容?

So how do I convert the below into something that uses this module?

import sqlite3

#Stand in CPU intensive task
def calculate(value):
    return value * 10

#Stand in Thread I/O intensive task
def output(value):
    global db

    if (value % 1000) == 0:
        db.execute('delete from test_table')

    db.execute('insert into test_table (result) values (?)', (value,))

def main():
    global db
    results = []

    db  = sqlite3.connect('e:\\z_dev\\test.sqlite')
    db.cursor()

    #=========
    #Perform CPU intensive task
    for i in range(1000):
        results.append( calculate(i))

    #Perform Threading intensive task
    for a in results:
        output(a)
    #=========

    db.commit()
    db.close()

if __name__ == '__main__':
    main()

我正在寻找不使用任何花哨/复杂python的答案.或一个很好的简单明了的解释,或者理想情况下两者都有!

I'm looking for an answer that doesn't use any fancy/complex python. Or a nice clear simple explanation, or ideally both!

谢谢

编辑:我当前的多处理器"实现.可能是错误的,但似乎可行.没有线程.这位于上面的#=========="部分内.

Edit: My current "multiprocessor" implementation. Probably wrong, but it seems to work. No threading whatsoever. This goes inside the "#=========" part of the above.

#Multiprocessing
pool = multiprocessing.Pool(None)
for i in range(1000):
    results.append( pool.apply_async(calculate(i)))
pool.close()
pool.join()

for i in results:
    results[i] = results[i].get()

#Complete lack of threading; but if I had it, it'd be here:     
for a in results:
    output(a)

推荐答案

concurrent.futures具有简约的API.对于非常简单的问题,它很容易使用,但是您没有非常简单的问题.如果这样做,您已经解决了;-)

concurrent.futures has a minimalistic API. It's easy to use for very straightforward problems, but you don't have a very straightforward problem. If you did, you would already have solved it ;-)

您没有显示您编写的任何multiprocessing.Pool代码,但这将是一个更有希望的起点-假设您要解决问题多于希望确认它必须容易实现的希望,如果只切换到较弱的API则可以做;-)

You didn't show any of the multiprocessing.Pool code you wrote, but that would be a more promising place to start - assuming you want to solve the problem more than you want to confirm your hope that it must be easy to do if you only you switched to a weaker API ;-)

使用multiprocessing的明显"方法是使用Pool.apply_async()方法,将异步结果对象放在有界的Queue.Queue上,并在主程序中让线程将这些对象从Queue中拉出,然后等待结果显示出来.这很容易,但这不是魔术.它解决了您的问题,因为有界Queues以不同速度运行的生产者和消费者之间进行调解的规范方法. concurrent.futures中的任何内容都不能直接解决 问题,这是大量内存"问题的核心.

"An obvious" way to proceed using multiprocessing is to use the Pool.apply_async() method, put the async result objects on a bounded Queue.Queue, and have threads in your main program pull those off the Queue and wait for the results to show up. This is easy enough, but it's not magic. It solves your problem because bounded Queues are the canonical way to mediate between producers and consumers that run at different speeds. Nothing in concurrent.futures addresses that problem directly, and it's at the heart of your "massive amounts of memory" problem.

# Define global result_queue only in the main program.
import Queue
result_queue = Queue.Queue(100)  # pick a reasonable max size based on your problem

# Run this in as many threads as you like.
def consume_results():
    while True:
        a = result_queue.get()
        if a is None:
            break
        output(a.get())  # `output()` is your function

...
# main program passes out work, after starting threads
for i in range(1000):
    # the .put() will block so long as the queue is at its max size
    result_queue.put(pool.apply_async(calculate, args=(i,)))
# add sentinels to let threads know they're done
for i in range(number_of_threads_you_started):
    result_queue.put(None)

这是您需要使生产者和消费者保持大致平衡的事情,而且任何标准库中都没有什么可以神奇地为您做到的.

That's the kind of thing you need to keep producers and consumers roughly in balance, and there's nothing in any standard library that will do it for you by magic.

编辑-充实

这是一个完整的可执行示例,任何使用Python3的人都可以运行.注意:

Here's a complete, executable example anyone with Python3 can run. Notes:

  • 它不使用您的代码片段,因为那些依赖于外部数据库模块的代码并不是每个人都可以运行的.
  • 它坚持使用concurrent.futures来管理进程和线程.实际上,使用multiprocessingthreading并不难,实际上在这里使用了 way 线程,直接使用threading会容易一些.但是这种方式很明显.
  • concurrent.futures Future对象与multiprocessing异步结果对象基本上是同一件事-API功能的拼写不同.
  • 您的问题并不简单,因为它具有可以以不同速度运行的多个阶段.同样,任何标准库中的任何内容都无法通过魔术掩盖潜在的不良后果.创建自己的有界队列仍然是最好的解决方案.对于MAX_QUEUE_SIZE的任何合理值,此处的内存使用将保持适度.
  • 通常不想创建比可用内核数少一个的CPU绑定工作进程.主程序还需要运行周期,操作系统也需要运行周期.
  • 一旦您习惯了这些东西,这段代码中的所有注释就会很烦人,就像在代码行i += 1;-)
  • 上看到注释以1为增量"一样
  • It doesn't use your code fragments, because those rely on an external database module not everyone can run.
  • It sticks to concurrent.futures to manage both processes and threads. It's not really harder to use multiprocessing and threading instead, and indeed the way threads are used here it would be a little easier using threading directly. But this way is clear enough.
  • A concurrent.futures Future object is basically the same thing as a multiprocessing async result object - the API functionalities are just spelled differently.
  • Your problem is not straightforward, because it has multiple stages that can run at different speeds. Again, nothing in any standard library can hide the potentially bad consequences of that by magic. Creating your own bounded queue remains the best solution to that. Memory use here will remain modest for any sane value of MAX_QUEUE_SIZE.
  • You generally don't want to create more CPU-bound worker processes than one less than the number of cores you have available to use. The main program also needs cycles to run, and so does the OS.
  • Once you're used to this stuff, all the comments in this code would be annoying, like seeing the comment "increment by 1" on the code line i += 1 ;-)

代码如下:

import concurrent.futures as cf
import threading
import queue

NUM_CPUS = 3
NUM_THREADS = 4
MAX_QUEUE_SIZE = 20

# Runs in worker processes.
def producer(i):
    return i + 10

def consumer(i):
    global total
    # We need to protect this with a lock because
    # multiple threads in the main program can
    # execute this function simultaneously.
    with sumlock:
        total += i

# Runs in threads in main program.
def consume_results(q):
    while True:
        future = q.get()
        if future is None:
            break
        else:
            consumer(future.result())

if __name__ == "__main__":
    sumlock = threading.Lock()
    result_queue = queue.Queue(MAX_QUEUE_SIZE)
    total = 0
    NUM_TO_DO = 1000
    with cf.ThreadPoolExecutor(NUM_THREADS) as tp:
        # start the threads running `consume_results`
        for _ in range(NUM_THREADS):
            tp.submit(consume_results, result_queue)
        # start the worker processes
        with cf.ProcessPoolExecutor(NUM_CPUS) as pp:
            for i in range(NUM_TO_DO):
                # blocks until the queue size <= MAX_QUEUE_SIZE
                result_queue.put(pp.submit(producer, i))
        # tell threads we're done
        for _ in range(NUM_THREADS):
            result_queue.put(None)
    print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)

如果一切正常,这是预期的输出:

If all is well, this is the expected output:

got 509500 expected 509500

这篇关于Concurrent.futures使用指南-同时使用线程和处理的简单示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆