Python中的并发 - 进程池

可以按照我们创建和使用线程池的相同方式创建和使用进程池.可以将进程池定义为预先实例化和空闲进程的组,这些进程随时可以进行工作.当我们需要执行大量任务时,创建进程池优先于为每个任务实例化新进程.

Python模块 -  Concurrent.futures

Python标准库有一个名为 concurrent.futures 的模块.该模块在Python 3.2中添加,为开发人员提供了用于启动异步任务的高级接口.它是Python的线程和多处理模块之上的抽象层,用于提供使用线程池或进程池运行任务的接口.

在接下来的部分中,我们将看一下concurrent.futures模块的不同子类.

执行者类

执行者的抽象类concurrent.futures Python模块.它不能直接使用,我们需要使用以下具体子类之一 :

  • ThreadPoolExecutor

  • ProcessPoolExecutor

ProcessPoolExecutor  - 具体的子类

它是Executor类的具体子类之一.它使用多处理,我们获得了一组用于提交任务的流程.此池将任务分配给可用进程并安排它们运行.

如何创建ProcessPoolExecutor?

的帮助下concurrent.futures 模块及其具体子类 Executor ,我们可以轻松创建一个流程池.为此,我们需要构建一个 ProcessPoolExecutor ,其中包含我们在池中所需的进程数.默认情况下,数字为5.然后将任务提交到流程池.

示例

我们现在将考虑相同的示例我们在创建线程池时使用的唯一区别是现在我们将使用 ProcessPoolExecutor 而不是 ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

输出

False
False
Completed

在上面的示例中,Process PoolExecutor 已经构建了5个线程.然后,在给出消息之前等待2秒的任务被提交给进程池执行器.从输出中可以看出,任务直到2秒才完成,因此第一次调用 done()将返回False. 2秒后,任务完成,我们通过调用 result()方法得到未来的结果.

实例化ProcessPoolExecutor  -  Context Manager

实例化ProcessPoolExecutor的另一种方法是在上下文管理器的帮助下.它的工作方式类似于上例中使用的方法.使用上下文管理器的主要优点是它在语法上看起来很好.实例化可以在以下代码的帮助下完成;

with ProcessPoolExecutor(max_workers = 5) as executor

示例

为了更好地理解,我们采用与创建线程池时相同的示例.在此示例中,我们需要先导入 concurrent.futures 模块.然后创建一个名为 load_url()的函数,它将加载请求的URL.然后使用池中的5个线程创建 ProcessPoolExecutor . Process PoolExecutor 已被用作上下文管理器.我们可以通过调用 result()方法获得未来的结果.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/',
   'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

输出

以上Python脚本将生成以下输出 :

'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' page is 229476 bytes
'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Executor的使用.map()函数

Python map()函数广泛用于执行许多任务.一个这样的任务是将特定函数应用于迭代中的每个元素.类似地,我们可以将迭代器的所有元素映射到函数,并将它们作为独立作业提交给 ProcessPoolExecutor .请考虑以下Python脚本示例来理解这一点.

示例

我们将考虑使用

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

输出

上面的Python脚本将生成以下输出

 
 4 
 9 
 16 
 25

何时使用ProcessPoolExecutor和ThreadPoolExecutor?

现在我们已经研究了Executor类 -  ThreadPoolExecutor和ProcessPoolExecutor ,我们需要知道何时使用哪个执行者.我们需要在遇到CPU限制工作负载的情况下选择ProcessPoolExecutor,在I/O绑定工作负载的情况下选择ThreadPoolExecutor.

如果我们使用 ProcessPoolExecutor ,那么我们做不需要担心GIL,因为它使用多处理.此外,与 ThreadPoolExecution 相比,执行时间会更短.请考虑以下Python脚本示例来理解这一点.

示例

 
导入时间
 import concurrent.futures 
 value = [8000000,7000000] 
 def counting(n):
 start = time.time()
而n> ; 0:
n  -  = 1 
返回time.time() - 开始
 def main():
 start = time.time()
 with concurrent.futures.ProcessPoolExecutor()作为执行者:
代表数字,time_taken代表zip(值,executor.map(计数,值)):
 print('开始:{}所用时间:{}' .format(number,time_taken))
 print('总时间:{}'.format(time.time() -  start))
 if __name__ =='__ main__': 
 main()

输出

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

输出

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

从上述两个程序的输出,我们可以看到使用 ProcessPoolExecutor ThreadPoolExecutor 时执行时间的差异.