如何使用多重处理(例如,使用不同的CPU)一次预测Keras中的多个图像? [英] How to predict multiple images in Keras at a time using multiple-processing (e.g. with different CPUs)?

查看:127
本文介绍了如何使用多重处理(例如,使用不同的CPU)一次预测Keras中的多个图像?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有很多要使用经过训练的CNN模型分类的PNG图片.

I have a lot of PNG images that I want to classify, using a trained CNN model.

为了加快处理速度,我想对CPU使用多重处理(我有72个可用的内存,这里我只使用4个).我目前没有可用的GPU,但如有必要,我可以购买一个.

To speed up the process, I would like to use multiple-processing with CPUs (I have 72 available, here I'm just using 4). I don't have a GPU available at the moment, but if necessary, I could get one.

我的工作流程:

  1. 使用openCV

适应形状和格式

使用mymodel.predict(img)获取每个类别的概率

use mymodel.predict(img) to get the probability for each class

当涉及预测步骤时,它永远不会完成mymodel.predict(img)步骤.当我使用不带多处理模块的代码时,它可以正常工作.对于模型,我使用的是带有tensorflow后端的keras.

When it comes to the prediction step, it never finishes the mymodel.predict(img) step. When I use the code without the multiprocessing module, it works fine. For the model, I'm using keras with a tensorflow backend.

# load model
mymodel = load_model('190704_1_fcs_plotclassifier.h5')

# use python library multiprocessing to use different CPUs
import multiprocessing as mp

pool = mp.Pool(4)

# Define callback function to collect the output in 'outcomes'
outcomes = []

def collect_result(result):
    global outcomes
    outcomes.append(result)

# Define prediction function

def prediction(img):
    img = cv2.resize(img,(49,49))
    img = img.astype('float32') / 255
    img = np.reshape(img,[1,49,49,3])       

    status = mymodel.predict(img)
    status = status[0][1]

    return(status)

# Define evaluate function

def evaluate(i,figure):

    # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread(figure)
    status = prediction(img)

    outcome = [figure, status]
    return(i,outcome)

# execute multiprocessing
for i, item in enumerate(listoffigurepaths):
        pool.apply_async(evaluate, args=(i, item), callback=collect_result)
pool.close()
pool.join()

# get outcome
print(outcomes)

如果有人一次知道如何预测几张图像,那就太好了!

It would be great if somebody knows how to predict several images at a time!

我在这里简化了代码,但是如果有人举了一个例子,我将非常感激.

I simplified my code here, but if somebody has an example how it could be done, I would highly appreciate it.

推荐答案

处理速度
或RAM大小
或CPU核数
附加处理延迟最重要吗?
所有这些操作:

已知python multiprocessing 模块(与 joblib 相同)可以做到:

Does a processing-speed
or a size-of-RAM
or a number-of-CPU-cores
or an introduced add-on processing latency matter most?
ALL OF THESE DO:

The python multiprocessing module is known ( and the joblib does the same ) to:

multiprocessing软件包提供了本地和远程并发性,通过使用子流程而不是线程,有效地避开了全局解释器锁定.

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

然而,就像我们宇宙中的一切一样,这是有代价的:

Yet, as everything in our Universe, this comes at cost:

希望,由O/P表示为:

The wish, expressed by O/P as:

为了加快该过程,我想对CPU使用多处理(我有 72个可用

To speed up the process, I would like to use multiple-processing with CPUs (I have 72 available

对于经过预训练的 mymodel.predict() 的此类类似应用,

将被执行,或者,如果将其发送到Pool( 72 )执行中,则几乎可以肯定地通过交换使几乎所有硬件RAM窒息.

will, for this kind of a similar application of a pre-trained mymodel.predict()-or, if sent into a Pool( 72 )-execution almost for sure suffocate almost any hardware RAM by swapping.

这里是一个示例,其中 n_jobs = 100 指令生成了"just" -Do-Nothing worker-查看发生了什么(按时〜532+ [ms]丢失+内存-allocation-wise,其中XYZ [GB]或RAM已由O/S立即分配):

Here is an example, where "just"-Do-Nothing worker was spawned by the n_jobs = 100 directive - to see what happens ( time-wise ~ 532+ [ms] lost + memory-allocation-wise where XYZ [GB] or RAM have immediately been allocated by O/S ):

这是因为,每个 multiprocessing 生成的子进程(不是线程,因为O/P自己已经经历过)是首先实例化的(在有足够的附加组件之后)由于O/S进程/RAM分配管理导致的延迟)--- 完全复制 ---原始python进程内存在的生态系统(完整的 python 解释程序+所有其 import 版本模块+其所有内部状态和数据结构(是否使用-),因此确实发生了大量RAM分配(您是否注意到了)平台开始SWAP吗?请注意,在那之前已产生了多少子进程,并且您有一个上限可以容纳在RAM中的子进程的上限,如果尝试使用(或使用 joblib -s n_jobs = -1自动缩放指令)来填充更多子进程,该子进程引入了这个数字...

This comes from the fact, that each multiprocessing spawned sub-process ( not threads, as O/P has already experienced on her own ) is first instantiated ( after an adequate add-on latency due to O/S process/RAM-allocations-management ) as a ---FULL-COPY--- of the ecosystem present inside the original python process ( the complete python interpreter + all its import-ed modules + all its internal state and data-structures - used or not - ) so indeed huge amounts of RAM-allocations take place ( have you noticed the platform started to SWAP? notice how many sub-processes were spawned until that time and you have a ceiling of how many such can fit in-RAM and it makes devastating performance effects if trying ( or letting, by using the joblib-s n_jobs = -1 auto-scaling directive ) to populate more sub-processes, that this SWAP-introducing number...

到目前为止,我们已经花了一些时间(通常是经过精心设计的代码,可以忽略不计,如果与再次完全训练整个预测变量相比,不是吗?)花了一些时间并行处理.

So far good, we have paid some ( often for carefully designed code a reasonably negligible amount, if compared to fully train again the whole predictor, doesn't it? ) time to spawn some number of parallel processes.

如果接下来将分布式工作负载返回到一个公用的,性能方面的单一资源(带有文件的磁盘目录树),则并行进程的性能会提高,但会造成严重破坏-它必须等待此类资源(!)首先使其免费.

If the distributed workload next goes back, to one, common and performance-wise singular resource ( a disk directory-tree with files ), the performance of parallel-processes goes but in wreck havoc - it has to wait for such resource(!) to first get it free again.

最后,即使正确"数量的 Pool() 产生的子进程,也阻止了O/S启动SWAPPING RAM到磁盘并返回,从而避免了进程间的通信.这是非常昂贵的-在这里,序列化(Pickling/unPickling)+排队+将所有DATA对象取消排队,一个必须在那里传递并返回(是的,即使是 callback 乐趣),因此发送的次数越少,Pool处理将变得越快.

Finally, even the "right"-amount of Pool()-spawned sub-processes, such that prevents am O/S to start SWAPPING RAM to disk and back, the inter-process communication is extremely expensive -- here, serialising ( Pickling/unPickling ) + enQueueing + deQueueing all DATA-objects, one has to pass there and back ( yes, even for the callback fun ), so the less one sends, the way faster the Pool-processing will become.

在这里,所有与Pool关联的过程都可能会受益于结果的独立记录,这可能会减少过程间通信的规模和延迟,但也将合并结果(由任何数量的工作人员报告为普通日志.

Here, all Pool-associated processes might benefit from independent logging of the results, which may reduce both the scales and latency of the inter-process communications, but will also consolidate the results, reported by any number of workers into the common log.

没有确凿的事实(在[us]中测得的持续时间),剩下的只是一种看法.

Without hard facts ( measured durations in [us] ), one remains with just an opinion.

def prediction( img ):
    img = cv2.resize( img, ( 49, 49 ) ) 
    img = img.astype( 'float32' ) / 255
    img = np.reshape( img, [1, 49, 49, 3] )       

    status = mymodel.predict( img )
    status = status[0][1]

    return( status )

def evaluate( i, figure ):  # predict the propability of the picture to be in class 0 or 1
    img = cv2.imread( figure )
    status = prediction( img )

    outcome = [figure, status]

    return( i, outcome )
#--------------------------------------------------
from zmq import Stopwatch
aClk = Stopwatch()
#------------------------------------NOW THE COSTS OF ORIGINAL VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start(); _ = evaluate( 1, aFigureNAME ); A = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "as-is took {0:}[us]".format( A ) );aListOfRESULTs.append( A )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------


让我们再尝试一些其他事情:


Lets try something a bit else:

def eval_w_RAM_allocs_avoided( indexI, aFigureNAME ):
    return [ indexI,
             [ aFigureNAME,
               mymodel.predict( ( cv2.resize( cv2.imread( aFigureNAME ),
                                              ( 49, 49 )
                                              ).astype( 'float32' ) / 255
                                  ).reshape( [1, 49, 49, 3]
                                             )
                                )[0][1],
               ],
             ]

#------------------------------------NOW THE COSTS OF MOD-ed VERSION:
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start()
    _ = eval_w_RAM_allocs_avoided( 1, aFigureNAME )
    B = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "MOD-ed took {0:}[us] ~ {1:} x".format( B, float( B ) / A ) )
    aListOfRESULTs.append( B )
#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------


以及实际的img预处理管道开销成本:


And the actual img pre-processing pipeline overhead costs:

#------------------------------------NOW THE COSTS OF THE IMG-PREPROCESSING
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aClk.start()
    aPredictorSpecificFormatIMAGE = ( cv2.resize( cv2.imread( aFigureNAME ),
                                                  ( 49, 49 )
                                                  ).astype( 'float32' ) / 255
                                      ).reshape( [1, 49, 49, 3]
                                                 )
    C = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "IMG setup took {0:}[us] ~ {1:} of A".format( C, float( C ) / A ) )
    aListOfRESULTs.append( C )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------


实际文件的I/O操作:


Actual fileI/O ops:

#------------------------------------NOW THE COSTS OF THE IMG-FILE-I/O-READ
aListOfRESULTs = []
for iii in range( 100 ):
    #-------------------------------------------------aClk-ed---------- SECTION
    aFileNAME = listoffigurepaths[158 + iii * 172]
    aClk.start()
    _ = cv2.imread( aFileNAME )
    F = aClk.stop()
    #-------------------------------------------------aClk-ed---------- SECTION
    print( "aFileIO took {0:}[us] ~ {1:} of A".format( F, float( F ) / A ) )
    aListOfRESULTs.append( F )

#----------------------------------------------------------------------
print( [ aFun( aListOfRESULTs ) for aFun in ( np.min, np.mean, np.max ) ] )
#----------------------------------------------------------------------

没有收集到这些事实(作为定量证据记录的一种形式),就很难决定,对于任何大规模的预测管道图像处理而言,最佳的性能提升步骤是什么.

Without these hard-fact collected ( as a form of quantitative records-of-evidence ), one could hardly decide, what would be the best performance boosting step here for any massive-scale prediction-pipeline image processing.

对这些项目进行测试后,发布结果和进一步的步骤(是通过 multiprocessing.Pool 还是使用其他策略进行更大的性能扩展,以达到更高的性能)可能会首先得到合理的评估,因为硬事实是首先被收集的.

Having these items tested, post results and further steps ( be it for going via multiprocessing.Pool or using other strategy for larger performance scaling, to whatever higher performance ) may first get reasonably evaluated, as the hard facts were first collected to do so.

这篇关于如何使用多重处理(例如,使用不同的CPU)一次预测Keras中的多个图像?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆