python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能 [英] python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map()

查看:84
本文介绍了python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 concurrent.futures.ProcessPoolExecutor 从数字范围内查找数字的出现.目的是调查从并发中获得的加速性能.为了基准性能,我有一个控制 - 一个串行代码来执行所述任务(如下所示).我编写了 2 个并发代码,一个使用 concurrent.futures.ProcessPoolExecutor.submit() 另一个使用 concurrent.futures.ProcessPoolExecutor.map() 来执行相同的任务.它们如下所示.关于起草前者和后者的建议可以在

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5# -*- 编码:utf-8 -*-导入 concurrent.futures 作为 cf从时间导入时间从回溯导入 print_excdef _findmatch(nmin, nmax, number):'''函数查找nmin到nmax范围内的数字出现并返回在列表中找到的出现.'''打印('
 def _findmatch', nmin, nmax, number)开始 = 时间()匹配=[]对于范围内的 n(nmin,nmax):如果 str(n) 中的数字:match.append(n)结束 = 时间() - 开始打印(在{1:.4f}秒内找到{0}".格式(len(匹配),结束))返回匹配def _concurrent_submit(nmax, number, workers):'''利用 concurrent.futures.ProcessPoolExecutor.submit 的函数在并行化的数字范围内找到给定数字的出现方式.'''# 1. 局部变量开始 = 时间()块 = nmax//工人期货 = []找到 =[]#2.并行化以 cf.ProcessPoolExecutor(max_workers=workers) 作为执行者:# 2.1.分散工作量并提交到工作池对于我在范围内(工人):cstart = 块 * 我cstop = chunk * (i + 1) if i != workers - 1 else nmaxfutures.append(executor.submit(_findmatch, cstart, cstop, number))# 2.2.指导工作人员处理结果,当所有结果都# 完成或.....cf.as_completed(futures) # 比 cf.wait() 快# 2.3.将结果合并为一个列表并返回此列表.对于未来的未来:对于future.result() 中的f:尝试:found.append(f)除了:打印_exc()foundsize = len(找到)结束 = 时间() - 开始print('def _concurrent_submit() 语句内:')打印(在 {1:.4f} 秒内找到 {0}".格式(foundsize,end))返回找到如果 __name__ == '__main__':nmax = int(1E8) # 数字范围最大值.number = str(5) # 要在数字范围内找到的数字.工人 = 6 # 工人池开始 = 时间()a = _concurrent_submit(nmax, number, workers)结束 = 时间() - 开始打印('
主要')打印('工人=',工人)打印(在{1:.4f}秒内找到{0}".格式(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5# -*- 编码:utf-8 -*-导入 concurrent.futures 作为 cf导入迭代工具从时间导入时间从回溯导入 print_excdef _findmatch(listnumber, number):'''函数查找另一个数中出现的数并返回一个字符串值.'''#print('def _findmatch(listnumber, number):')#print('listnumber = {0} and ref = {1}'.format(listnumber, number))如果 str(listnumber) 中的数字:x = 列表编号#print('x = {0}'.format(x))返回 xdef _concurrent_map(nmax, number, workers):'''利用 concurrent.futures.ProcessPoolExecutor.map 的函数在并行化的数字范围内找到给定数字的出现次数方式.'''# 1. 局部变量开始 = 时间()块 = nmax//工人期货 = []找到 =[]#2.并行化以 cf.ProcessPoolExecutor(max_workers=workers) 作为执行者:# 2.1.分散工作量并提交到工作池对于我在范围内(工人):cstart = 块 * 我cstop = chunk * (i + 1) if i != workers - 1 else nmax数字列表 = 范围(cstart,cstop)futures.append(executor.map(_findmatch, numberlist,itertools.repeat(number),块大小 = 10000))# 2.3.将结果合并为一个列表并返回此列表.对于未来的未来:对于未来的 f:如果 f:尝试:found.append(f)除了:打印_exc()foundsize = len(找到)结束 = 时间() - 开始print('def _concurrent(nmax, number) 的语句内:')打印(在 {1:.4f} 秒内找到 {0}".格式(foundsize,end))返回找到如果 __name__ == '__main__':nmax = int(1E8) # 数字范围最大值.number = str(5) # 要在数字范围内找到的数字.工人 = 6 # 工人池开始 = 时间()a = _concurrent_map(nmax, number, workers)结束 = 时间() - 开始打印('
主要')打印('工人=',工人)打印(在{1:.4f}秒内找到{0}".格式(len(a),end))

序列号:

#!/usr/bin/python3.5# -*- 编码:utf-8 -*-从时间导入时间def _serial(nmax, number):开始 = 时间()匹配=[]nlist = 范围(nmax)对于 nlist 中的 n:如果 str(n):match.append(n) 中的数字结束=时间()-开始打印(在{1:.4f}秒内找到{0}".格式(len(匹配),结束))返回匹配如果 __name__ == '__main__':nmax = int(1E8) # 数字范围最大值.number = str(5) # 要在数字范围内找到的数字.开始 = 时间()a = _serial(nmax, number)结束 = 时间() - 开始打印('
主要')打印(在{1:.4f}秒内找到{0}".格式(len(a),end))

2017 年 2 月 13 日更新:

除了@niemmi 的回答之外,我还根据一些个人研究提供了一个答案:

  1. 如何进一步加速@niemmi 的 .map().submit() 解决方案,以及
  2. ProcessPoolExecutor.map()ProcessPoolExecutor.submit() 可以带来更多的加速时.

解决方案

概述:

我的回答分为两部分:

  • 第 1 部分展示了如何从 @niemmi 的 ProcessPoolExecutor.map() 解决方案中获得更多加速.
  • 第 2 部分展示了 ProcessPoolExecutor 的子类 .submit().map() 何时产生不等价的计算时间.

========================================================================

第 1 部分:ProcessPoolExecutor.map() 的更多加速​​

背景:本节建立在@niemmi 的 .map() 解决方案之上,该解决方案本身就非常出色.在对他的离散化方案进行一些研究以更好地理解它如何与 .map() 块大小争论相互作用时,我发现了这个有趣的解决方案.

我认为@niemmi对chunk = nmax//workers的定义是chunksize的定义,即worker中每个worker要处理的实际数字范围(给定任务)的较小尺寸水池.现在,这个定义的前提是假设一台计算机有 x 个工人,在每个工人之间平均分配任务将导致每个工人的最佳使用,因此整个任务将最快完成.因此,将给定任务分解成的块数应始终等于池工作者的数量.然而,这个假设正确吗?

命题: 在此,我提出当与 ProcessPoolExecutor.map() 一起使用时,上述假设并不总是导致最快的计算时间.相反,将任务离散化为大于池工作人员数量的数量可以提高速度,即更快地完成给定任务.

实验:我修改了@niemmi 的代码,允许离散化任务的数量超过池工作者的数量.下面给出了这段代码,用于计算数字 5 在 0 到 1E8 的数字范围内出现的次数.我已经使用 1、2、4 和 6 个池工作者执行了此代码,并且离散化任务的数量与池工作者的数量的比例不同.对于每个场景,进行了 3 次运行,并列出了计算时间.加速"在这里定义为在离散化任务数量大于池工作线程数量时,使用相同数量的块和池工作线程的平均计算时间超过平均计算时间.

调查结果:

  1. 左图显示了实验部分提到的所有场景所花费的计算时间.它表明块数/工人数= 1所花费的计算时间总是大于块数>工人数所花费的计算时间.也就是说,前一种情况总是比后一种情况效率低.

  2. 右图显示块数/工作器数量达到 14 或更多的阈值时,速度提高了 1.2 倍或更多.有趣的是,当 ProcessPoolExecutor.map() 以 1 个工作线程执行时,也出现了加速趋势.

结论:在自定义 ProcessPoolExecutor.map()` 用于解决给定任务的离散任务数量时,谨慎的做法是确保该数量大于池工作线程的数量,因为这种做法缩短了计算时间.

concurrent.futures.ProcessPoolExecutor.map() 代码.(仅修订部分)

def _concurrent_map(nmax, number, workers, num_of_chunks):'''利用 concurrent.futures.ProcessPoolExecutor.map 的函数在并行化的数字范围内找到给定数字的出现次数方式.'''# 1. 局部变量开始 = 时间()chunksize = nmax//num_of_chunks期货 = []找到 =[]#2.并行化以 cf.ProcessPoolExecutor(max_workers=workers) 作为执行者:# 2.1.分散工作量并提交到工作池cstart = (chunksize * i for i in range(num_of_chunks))cstop = (chunksize * i if i != num_of_chunks else nmax对于范围内的 i(1, num_of_chunks + 1))期货 = executor.map(_findmatch, cstart, cstop,itertools.repeat(number))# 2.2.将结果合并为一个列表并返回此列表.对于未来的未来:#print('type(future)=',type(future))对于未来的 f:如果 f:尝试:found.append(f)除了:打印_exc()foundsize = len(找到)结束 = 时间() - 开始print('
 在 def _concurrent(nmax, number) 的语句中:')打印(在 {1:.4f} 秒内找到 {0}".格式(foundsize,end))返回找到如果 __name__ == '__main__':nmax = int(1E8) # 数字范围最大值.number = str(5) # 要在数字范围内找到的数字.工人 = 4 # 工人池chunks_vs_workers = 14 # =>14 的因子可以提供最佳性能num_of_chunks = chunks_vs_workers * 工人开始 = 时间()a = _concurrent_map(nmax, number, workers, num_of_chunks)结束 = 时间() - 开始打印('
主要')print('nmax={}, workers={}, num_of_chunks={}'.format(nmax, 工人, num_of_chunks))打印('工人=',工人)打印(在{1:.4f}秒内找到{0}".格式(len(a),end))

========================================================================

第 2 部分:使用 ProcessPoolExecutor 子类 .submit() 和 .map() 的总计算时间在返回排序/排序结果列表时可能不同.

背景:我修改了 .submit().map() 代码以允许apple-to-apple" 比较它们的计算时间和可视化主代码计算时间的能力、主代码调用的 _concurrent 方法执行并发操作的计算时间,以及每个离散化任务/工作线程调用的计算时间_并发方法.此外,这些代码中的并发方法被构造为直接从 .submit() 的 future 对象和 .map().下面提供了源代码(希望对您有所帮助.).

实验 这两个新改进的代码用于执行第 1 部分中描述的相同实验,除了仅考虑 6 个池工作者和 python 内置 listsorted 方法分别用于将结果的无序和有序列表返回到代码的主要部分.

调查结果:

  1. 从_concurrent方法的结果可以看出_concurrent方法的计算次数,用于创建ProcessPoolExecutor.submit()的所有Future对象,以及创建ProcessPoolExecutor的迭代器.map() 作为离散化任务数量与池工作者数量的函数是等效的.这个结果只是意味着 ProcessPoolExecutor 子类 .submit().map() 同样高效/快速.
  2. 比较 main 和它的 _concurrent 方法的计算时间,我们可以看到 main 的运行时间比它的 _concurrent 方法长.这是可以预料的,因为它们的时间差反映了 listsorted 方法(以及包含在这些方法中的其他方法)的计算时间量.很明显,list 方法比 sorted 方法花费更少的计算时间来返回结果列表..submit() 和 .map() 代码的 list 方法的平均计算时间相似,约为 0.47 秒..submit() 和 .map() 代码的排序方法的平均计算时间分别为 1.23 秒和 1.01 秒.换句话说,对于 .submit() 和 .map() 代码,list 方法的执行速度分别比 sorted 方法快 2.62 倍和 2.15 倍.
  3. 不清楚为什么 sorted 方法从.map() 比从 .submit() 更快,因为离散化的数量任务增加的数量超过池工作者的数量,保存时离散化任务的数量等于池工作者的数量.也就是说,这些发现表明使用同样快的 .submit().map() 子类的决定可能会受到 sorted 方法的阻碍.例如,如果目的是在尽可能短的时间内生成一个有序列表,那么应该优先使用 ProcessPoolExecutor.map() 而不是 ProcessPoolExecutor.submit() 作为 .map() 可以允许最短的总计算时间.
  4. 此处显示了我的答案的第 1 部分中提到的离散化方案,以加快 .submit().map() 子的性能类.当离散化任务的数量等于池工作者的数量时,加速量可以高达 20%.

改进的 .map() 代码

#!/usr/bin/python3.5# -*- 编码:utf-8 -*-导入 concurrent.futures 作为 cf从时间导入时间从 itertools 导入重复,链def _findmatch(nmin, nmax, number):'''函数查找nmin到nmax范围内的数字出现并返回在列表中找到的出现.'''开始 = 时间()匹配=[]对于范围内的 n(nmin,nmax):如果 str(n) 中的数字:match.append(n)结束 = 时间() - 开始#print("
 def _findmatch {0:<10} {1:<10} {2:<3} 在 {4:.4f} 秒内找到了 {3:8}".# 格式(nmin, nmax, number, len(match),end))返回匹配def _concurrent(nmax, number, workers, num_of_chunks):'''利用 concurrent.futures.ProcessPoolExecutor.map 的函数在并发中查找给定数字在数字范围内的出现次数方式.'''# 1. 局部变量开始 = 时间()chunksize = nmax//num_of_chunks#2.并行化以 cf.ProcessPoolExecutor(max_workers=workers) 作为执行者:# 2.1.分散工作量并提交到工作池cstart = (chunksize * i for i in range(num_of_chunks))cstop = (chunksize * i if i != num_of_chunks else nmax对于范围内的 i(1, num_of_chunks + 1))futures = executor.map(_findmatch, cstart, cstop, repeat(number))结束 = 时间() - 开始print('
 在 def _concurrent_map(nmax, number, workers, num_of_chunks) 的语句中:')打印(在{0:.4f}秒内找到".格式(结束))return list(chain.from_iterable(futures)) #返回一个无序列表#return sorted(chain.from_iterable(futures)) #返回一个有序的结果列表如果 __name__ == '__main__':nmax = int(1E8) # 数字范围最大值.number = str(5) # 要在数字范围内找到的数字.工人 = 6 # 工人池chunks_vs_workers = 30 # =>14 的因子可以提供最佳性能num_of_chunks = chunks_vs_workers * 工人开始 = 时间()found = _concurrent(nmax, number, workers, num_of_chunks)结束 = 时间() - 开始打印('
主要')print('nmax={}, workers={}, num_of_chunks={}'.format(nmax, 工人, num_of_chunks))#print('找到 = ', 找到)打印(在{1:.4f}秒内找到{0}".格式(len(找到),结束))

改进了 .submit() 代码.
此代码与 .map 代码相同,只是将 _concurrent 方法替换为以下内容:

def _concurrent(nmax, number, workers, num_of_chunks):'''利用 concurrent.futures.ProcessPoolExecutor.submit 的函数在并发中查找给定数字在数字范围内的出现次数方式.'''# 1. 局部变量开始 = 时间()chunksize = nmax//num_of_chunks期货 = []#2.并行化以 cf.ProcessPoolExecutor(max_workers=workers) 作为执行者:# 2.1.分散工作量并提交到工作池对于我在范围内(num_of_chunks):cstart = 块大小 * icstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmaxfutures.append(executor.submit(_findmatch, cstart, cstop, number))结束 = 时间() - 开始print('
 在 def _concurrent_submit(nmax, number, workers, num_of_chunks) 的语句中:')打印(在{0:.4f}秒内找到".格式(结束))返回列表(chain.from_iterable(f.result() for f in cf.as_completed()futures))) #返回一个无序列表#return list(chain.from_iterable(f.result() for f in cf.as_completed()# futures))) #返回有序列表

========================================================================

I am using concurrent.futures.ProcessPoolExecutor to find the occurrence of a number from a number range. The intent is to investigate the amount of speed-up performance gained from concurrency. To benchmark performance, I have a control - a serial code to perform said task (shown below). I have written 2 concurrent codes, one using concurrent.futures.ProcessPoolExecutor.submit() and the other using concurrent.futures.ProcessPoolExecutor.map() to perform the same task. They are shown below. Advice on drafting the former and latter can be seen here and here, respectively.

The task issued to all three codes was to find the number of occurrences of the number 5 in the number range of 0 to 1E8. Both .submit() and .map() were assigned 6 workers, and .map() had a chunksize of 10,000. The manner to discretise the workload were identical in the concurrent codes. However, the function used to find occurrences in both codes were different. This was because the way arguments were passed to a function called by .submit() and .map() were different.

All 3 codes reported the same number of occurrences, i.e. 56,953,279 times. However, the time taken to complete the task were very different. .submit() performed 2 times faster than the control while .map() took twice as long as the control to complete it's task.

Questions:

  1. I would like to know if the slow performance of .map() is an artifact of my coding or it is inherently slow?" If the former, how can I improve it. I am just surprise that it performed slower than the control as there will be no much incentive to use it.
  2. I like to know if there is anyway to make .submit() code perform even faster. A condition I have is that the function _concurrent_submit() must return an iterable with the numbers/occurrences containing the number 5.

Benchmark Results

concurrent.futures.ProcessPoolExecutor.submit()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from traceback import print_exc

def _findmatch(nmin, nmax, number):
    '''Function to find the occurrence of number in range nmin to nmax and return
       the found occurrences in a list.'''
    print('
 def _findmatch', nmin, nmax, number)
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

def _concurrent_submit(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
        # 2.2. Instruct workers to process results as they come, when all are
        #      completed or .....
        cf.as_completed(futures) # faster than cf.wait()
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future.result():
                try:
                    found.append(f)
                except:
                    print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent_submit():')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_submit(nmax, number, workers)
    end = time() - start
    print('
 main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

concurrent.futures.ProcessPoolExecutor.map()

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc

def _findmatch(listnumber, number):    
    '''Function to find the occurrence of number in another number and return
       a string value.'''
    #print('def _findmatch(listnumber, number):')
    #print('listnumber = {0} and ref = {1}'.format(listnumber, number))
    if number in str(listnumber):
        x = listnumber
        #print('x = {0}'.format(x))
        return x 

def _concurrent_map(nmax, number, workers):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunk = nmax // workers
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax
            numberlist = range(cstart, cstop)
            futures.append(executor.map(_findmatch, numberlist,
                                        itertools.repeat(number),
                                        chunksize=10000))
        # 2.3. Consolidate result as a list and return this list.
        for future in futures:
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers

    start = time()
    a = _concurrent_map(nmax, number, workers)
    end = time() - start
    print('
 main')
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

Serial Code:

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

from time import time

def _serial(nmax, number):    
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end=time()-start
    print("found {0} in {1:.4f}sec".format(len(match),end))
    return match

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.

    start = time()
    a = _serial(nmax, number)
    end = time() - start
    print('
 main')
    print("found {0} in {1:.4f}sec".format(len(a),end))

Update 13th Feb 2017:

In addition to @niemmi answer, I have provide an answer following some personal research to show:

  1. how to further speed-up @niemmi's .map() and .submit() solutions, and
  2. when ProcessPoolExecutor.map() can led to more speed-up than ProcessPoolExecutor.submit().

解决方案

Overview:

There are 2 parts to my answer:

  • Part 1 shows how to gain more speed-up from @niemmi's ProcessPoolExecutor.map() solution.
  • Part 2 shows when the ProcessPoolExecutor's subclasses .submit() and .map() yield non-equivalent compute times.

=======================================================================

Part 1: More Speed-up for ProcessPoolExecutor.map()

Background: This section builds on @niemmi's .map() solution, which by itself is excellent. While doing some research on his discretization scheme to better understand how that interact with .map() chunksizes arguement, I found this interesting solution.

I regard @niemmi's definition of chunk = nmax // workers to be a definition for chunksize, i.e. a smaller size of actual number range (given task) to be tackled by each worker in the worker pool. Now, this definition is premised on the assumption that if a computer has x number of workers, dividing the task equally among each worker will result in optimum use of each worker and hence the total task will be completed fastest. Therefore, the number of chunks to break up a given task into should always equal the number of pool workers. However, is this assumption correct?

Proposition: Here, I propose that the above assumption does not always lead to the fastest compute time when used with ProcessPoolExecutor.map(). Rather, discretising a task to an amount greater than the number of pool workers can lead to speed-up, i.e. faster completion of a given task.

Experiment: I have modified @niemmi's code to allow the number of discretized tasks to exceed the number of pool workers. This code is given below and used to fin the number of times the number 5 appears in the number range of 0 to 1E8. I have executed this code using 1, 2, 4, and 6 pool workers and for various ratio of number of discretized tasks vs the number of pool workers. For each scenario, 3 runs were made and the compute times were tabulated. "Speed-up" is defined here as the average compute time using equal number of chunks and pool workers over the average compute time of when the number of discretized tasks is greater than the number of pool workers.

Findings:

  1. Figure on left shows the compute time taken by all the scenarios mentioned in the experiment section. It shows that the compute time taken by number of chunks / number of workers = 1 is always greater than the compute time taken by number of chunks > number of workers. That is, the former case is always less efficient than the latter.

  2. Figure on right shows that a speed-up of 1.2 times or more was gained when the number of chunks / number of workers reach a threshold value of 14 or more. It is interesting to observe that the speed-up trend also occurred when ProcessPoolExecutor.map() was executed with 1 worker.

Conclusion: When customizing the number of discrete tasks that ProcessPoolExecutor.map()` should use to solve a given task, it is prudent to ensure that this number is greater than the number pool workers as this practice shortens compute time.

concurrent.futures.ProcessPoolExecutor.map() code. (revised parts only)

def _concurrent_map(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a parallelised
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    found =[]
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop,
                               itertools.repeat(number))
        # 2.2. Consolidate result as a list and return this list.
        for future in futures:
            #print('type(future)=',type(future))
            for f in future:
                if f:
                    try:
                        found.append(f)
                    except:
                        print_exc()
        foundsize = len(found)
        end = time() - start
        print('
 within statement of def _concurrent(nmax, number):')
        print("found {0} in {1:.4f}sec".format(foundsize, end))
    return found

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 4     # Pool of workers
    chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance  
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    a = _concurrent_map(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('
 main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    print('workers = ', workers)
    print("found {0} in {1:.4f}sec".format(len(a),end))

=======================================================================

Part 2: Total compute time from using ProcessPoolExecutor subclasses .submit() and .map() can be dissimilar when returning a sorted/ordered result list.

Background: I have amended both the .submit() and .map() codes to allow an "apple-to-apple" comparison of their compute time and the ability to visualize the compute time of the main code, the compute time of the _concurrent method called by the main code to performs the concurrent operations, and the compute time for each discretized task/worker called by the _concurrent method. Furthermore, the concurrent method in these codes was structured to return an unordered and ordered list of the result directly from the future object of .submit() and the iterator of .map(). Source code is provided below (Hope it helps you.).

Experiments These two newly improved codes were used to perform the same experiment described in Part 1, save that only 6 pool workers were considered and the python built-in list and sorted methods were used to return an unordered and ordered list of the results to the main section of the code, respectively.

Findings:

  1. From the _concurrent method's result, we can see the compute times of the _concurrent method used to create all the Future objects of ProcessPoolExecutor.submit(), and to create the iterator of ProcessPoolExecutor.map(), as a function of the number of discretized task over the number of pool workers, are equivalent. This result simply means that the ProcessPoolExecutor sub-classes .submit() and .map() are equally efficient/fast.
  2. Comparing the compute times from main and it's _concurrent method, we can see that main ran longer than it's _concurrent method. This is to be expected as their time difference reflects the amount of compute times of the list and sorted methods (and that of the other methods encased within these methods). Clearly seen, the list method took less compute time to return a result list than the sorted method. The average compute times of the list method for both the .submit() and .map() codes were similar, at ~0.47sec. The average compute time of the sorted method for the .submit() and .map() codes was 1.23sec and 1.01sec, respectively. In other words, the list method performed 2.62 times and 2.15 times faster than sorted method for the .submit() and .map() codes, respectively.
  3. It is not clear why the sorted method generated an ordered list from .map() faster than from .submit(), as the number of discretized tasks increased more than the number of pool workers, save when the number of discretized tasks equaled the number of pool workers. That said, these findings shows that the decision to use the equally fast .submit() or .map() sub-classes can be encumbered by the sorted method. For example, if the intent is to generate an ordered list in the shortest time possible, the use of ProcessPoolExecutor.map() should be preferred over ProcessPoolExecutor.submit() as .map() can allow the shortest total compute time.
  4. The discretization scheme mentioned in Part 1 of my answer is shown here to speed-up the performance of both the .submit() and .map() sub-classes. The amount of speed-up can be as much as 20% over the case when the number of discretized tasks equaled the number of pool workers.

Improved .map() code

#!/usr/bin/python3.5
# -*- coding: utf-8 -*-

import concurrent.futures as cf
from time import time
from itertools import repeat, chain 


def _findmatch(nmin, nmax, number):
    '''Function to find the occurence of number in range nmin to nmax and return
       the found occurences in a list.'''
    start = time()
    match=[]
    for n in range(nmin, nmax):
        if number in str(n):
            match.append(n)
    end = time() - start
    #print("
 def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
    #      format(nmin, nmax, number, len(match),end))
    return match

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        cstart = (chunksize * i for i in range(num_of_chunks))
        cstop = (chunksize * i if i != num_of_chunks else nmax
                 for i in range(1, num_of_chunks + 1))
        futures = executor.map(_findmatch, cstart, cstop, repeat(number))
    end = time() - start
    print('
 within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(futures)) #Return an unordered result list
    #return sorted(chain.from_iterable(futures)) #Return an ordered result list

if __name__ == '__main__':
    nmax = int(1E8) # Number range maximum.
    number = str(5) # Number to be found in number range.
    workers = 6     # Pool of workers
    chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
    num_of_chunks = chunks_vs_workers * workers

    start = time()
    found = _concurrent(nmax, number, workers, num_of_chunks)
    end = time() - start
    print('
 main')
    print('nmax={}, workers={}, num_of_chunks={}'.format(
          nmax, workers, num_of_chunks))
    #print('found = ', found)
    print("found {0} in {1:.4f}sec".format(len(found),end))    

Improved .submit() code.
This code is same as .map code except you replace the _concurrent method with the following:

def _concurrent(nmax, number, workers, num_of_chunks):
    '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
       find the occurrences of a given number in a number range in a concurrent
       manner.'''
    # 1. Local variables
    start = time()
    chunksize = nmax // num_of_chunks
    futures = []
    #2. Parallelization
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        # 2.1. Discretise workload and submit to worker pool
        for i in range(num_of_chunks):
            cstart = chunksize * i
            cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
            futures.append(executor.submit(_findmatch, cstart, cstop, number))
    end = time() - start
    print('
 within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
    print("found in {0:.4f}sec".format(end))
    return list(chain.from_iterable(f.result() for f in cf.as_completed(
        futures))) #Return an unordered list
    #return list(chain.from_iterable(f.result() for f in cf.as_completed(
    #    futures))) #Return an ordered list

=======================================================================

这篇关于python concurrent.futures.ProcessPoolExecutor:.submit() 与 .map() 的性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆