Python中的并发性 - 线程池

假设我们必须为多线程任务创建大量线程.由于线程太多,因此可能存在许多性能问题,这在计算上是最昂贵的.一个主要问题可能是吞吐量受限.我们可以通过创建一个线程池来解决这个问题.线程池可以被定义为预先实例化和空闲线程的组,其准备好被给予工作.当我们需要执行大量任务时,创建线程池优先于为每个任务实例化新线程.线程池可以管理大量线程的并发执行,如下所示;

  • 如果线程池中的线程完成它的执行然后该线程可以被重用.

  • 如果一个线程被终止,将创建另一个线程来替换该线程.

Python模块 -  Concurrent.futures

Python标准库包含 concurrent.futures 模块.该模块在Python 3.2中添加,为开发人员提供了用于启动异步任务的高级接口.它是Python的线程和多处理模块之上的抽象层,用于提供使用线程池或进程池运行任务的接口.

在接下来的部分中,我们将了解concurrent.futures模块的不同类.

执行者类

执行者的抽象类concurrent.futures Python模块.它不能直接使用,我们需要使用以下具体子类之一 :

  • ThreadPoolExecutor

  • ProcessPoolExecutor

ThreadPoolExecutor  - 具体子类

它是Executor类的具体子类之一.子类使用多线程,我们获得了一个用于提交任务的线程池.此池将任务分配给可用线程并安排它们运行.

如何创建ThreadPoolExecutor?

并发的帮助下.futures 模块及其具体子类 Executor ,我们可以轻松创建一个线程池.为此,我们需要构造一个 ThreadPoolExecutor ,其中包含我们在池中想要的线程数.默认情况下,该数字为5.然后我们可以向线程池提交任务.当我们提交()任务时,我们会返回 Future . Future对象有一个名为 done()的方法,它告诉我们未来是否已经解决.有了这个,就为该特定的未来对象设置了一个值.当任务完成时,线程池执行器将值设置为future对象.

示例

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

输出

False
True
Completed

在上面的例子中, ThreadPoolExecutor 构造了5个线程.然后,在给出消息之前等待2秒的任务被提交给线程池执行器.从输出中可以看出,任务直到2秒才完成,因此第一次调用 done()将返回False. 2秒后,任务完成,我们通过调用 result()方法得到未来的结果.

实例化ThreadPoolExecutor  -  Context Manager

实例化 ThreadPoolExecutor 的另一种方法是在上下文管理器的帮助下.它的工作方式类似于上例中使用的方法.使用上下文管理器的主要优点是它在语法上看起来很好.实例化可以在以下代码的帮助下完成;

 
,ThreadPoolExecutor(max_workers = 5)作为执行者

示例

以下示例是从Python文档中借用的.在此示例中,首先必须导入 concurrent.futures 模块.然后创建一个名为 load_url()的函数,它将加载请求的URL.然后,该函数使用池中的5个线程创建 ThreadPoolExecutor
. ThreadPoolExecutor 已被用作上下文管理器.我们可以通过调用 result()方法获得未来的结果.

import concurrent.futures
import urllib.request

URLS = ['https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/',
   'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

输出

以下是上述Python脚本的输出 :

'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' page is 229313 bytes
'https://img01.yuandaxia.cn/Content/img/tutorials/concurrency_in_python/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

使用Executor. map()函数

Python map()函数广泛用于许多任务中.一个这样的任务是将特定函数应用于迭代中的每个元素.类似地,我们可以将迭代器的所有元素映射到函数,并将它们作为独立的作业提交给 ThreadPoolExecutor .请考虑以下Python脚本示例,以了解该函数的工作原理.

示例

在下面的示例中,map函数用于应用 square()函数到values数组中的每个值.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

输出

上述Python脚本生成以下输出 :

 
 4 
 9 
 16 
 25