使用 python 的多处理并行化 keras 中的模型预测 [英] Parallelizing model predictions in keras using multiprocessing for python

查看:67
本文介绍了使用 python 的多处理并行化 keras 中的模型预测的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 keras 在 python2 中提供的 model.predict 命令并行执行模型预测.我将 tensorflow 1.14.0 用于 python2.我有 5 个模型 (.h5) 文件,并且希望预测命令并行运行.这是在 python 2.7 中运行的.我正在使用多处理池将模型文件名与多个进程的预测函数进行映射,如下所示,

I'm trying to perform model predictions in parallel using the model.predict command provided by keras in python2. I use tensorflow 1.14.0 for python2. I have 5 model (.h5) files and would like the predict command to run in parallel.This is being run in python 2.7. I'm using multiprocessing pool for mapping the model filenames with the prediction function on multiple processes as shown below,

import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
    global input
    from tensorflow.keras.models import load_model
    model=load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)

输入是从代码的另一部分获得的图像numpy数组.但是在执行此操作时,我得到以下信息,

The input is an image numpy array obtained from another part of the code. But on executing this I get the following,

Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
    return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'

我无法解释此错误消息,我该如何解决?非常感谢任何建议!

I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!

更新 2:感谢所有的指针和完整的例子@sokato.我执行了@sokato 发布的确切代码,但是出现了以下错误(我也对代码进行了更改并得到了如下所示的相同错误),

UPDATE 2: Thanks for all the pointers and for a full example @sokato. I executed the exact code posted by @sokato, however i got the following error(i made the changes in my code too and get the same error shown below),

Traceback (most recent call last):
  File "stackoverflow.py", line 47, in <module>
    with multiprocessing.Pool() as p:
AttributeError: __exit__

更新 3:感谢大家的支持.我认为 UPDATE2 中的问题是由于使用了 python2 而不是 python3.我能够通过使用 with closed(multiprocessing.Pool()) as p: 而不是 with multiprocessing.Pool 来解决 UPDATE2 中给出的错误.() as p: 在@sokato 的代码中.导入关闭函数如下:from contextlib import closed

UPDATE3: Thanks for all the support.I think the issue in UPDATE2 was due to usage of python2 instead of python3. I was able to solve the error given in UPDATE2 for python2 by using with closing(multiprocessing.Pool()) as p: instead of just with multiprocessing.Pool() as p: in @sokato's code. Import the closing function as follows: from contextlib import closing

使用不同方法的新问题,如下所示,

我实际上有多个输入进来.我想事先加载所有模型并将其保存在列表中,而不是每次为每个输入加载模型.我已经这样做了,如下所示,

I actually have multiple inputs coming in. Instead of loading model each time for each input I want to load all the models before hand and keep it in a list. I have done this as shown below,

import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
    loaded_models.append(tf.keras.models.load_model(model))

def prediction(input_tuple):
    inputs,loaded_models=input_tuple
    predops=[]
    for model in loaded_models:
        predops.append(model.predict(inputs).tolist()[0])
    actops=[]
    for predop in predops:
        actops.append(predop.index(max(predop)))
    max_freqq = max(set(actops), key = actops.count) 
    return max_freqq

#....some pre-processing....#

    '''new_all_t is a list which contains tuples and each tuple has inputs from all_t 
    and the list containing loaded models which will be extracted
 in the prediction function.'''

new_all_t=[]
for elem in all_t:
    new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
    predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))

new_all_t 是一个包含元组的列表,每个元组都有来自 all_t 的输入和包含将在预测函数中提取的加载模型的列表.但是,我现在收到以下错误,

new_all_t is a list which contains tuples and each tuple has inputs from all_t and the list containing loaded models which will be extracted in the prediction function.However, I get the following error now,

Traceback (most recent call last):
  File "trial_mult-ips.py", line 240, in <module>
    predops=p.map(prediction,new_all_t)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.

这到底说明了什么?我该如何解决这个问题?

What exactly does this indicate? How do I go about solving this?

更新 4:我包括了行 tf.compat.v1.enable_eager_execution()tf.compat.v1.enable_v2_behavior() 在最开始.现在我收到以下错误,

UPDATE4: I included the lines tf.compat.v1.enable_eager_execution() and tf.compat.v1.enable_v2_behavior() at the very beginning. Now i get the following error,

WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Traceback (most recent call last):
  File "the_other_end-mp.py", line 216, in <module>
    predops=p.map(prediction,modelon)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
ValueError: Resource handles are not convertible to numpy.

我无法解释此错误消息,我该如何解决?非常感谢任何建议!

I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!

推荐答案

所以,我不确定您的一些设计选择,但我根据给定的信息进行了最佳尝试.具体来说,我认为全局变量和并行函数中的导入语句可能存在一些问题.

So, I am unsure of some of your design choices but I gave it the best attempt with the given information. Specifically, I think there are maybe some issues with the global variable and the import statement within your parallel function.

  1. 您应该使用共享变量而不是全局变量来在进程之间共享输入.如果需要,您可以在多处理文档中阅读有关共享内存的更多信息.

  1. You should use shared variables and not global variables to share an input between processes. You can read more about shared memory if you want in the multiprocessing documentation.

我从教程中生成了模型,因为您的模型不包括在内.

I generated models from a tutorial since your models are not included.

您没有加入或关闭池,但使用以下代码,我能够成功并行执行代码.您可以通过调用 pool.close() 或使用下面显示的with"语法来关闭池.请注意, with 语法不适用于 python 2.7.

You are not joining or closing your pool but with the following code I was able to get the code to execute in parallel successfully. You can close the pool by calling pool.close() or with the "with" syntax shown in below. Note, the with syntax doesn't apply to python 2.7.

import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf

mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

def createModels(models):
    model = tf.keras.models.Sequential([
        tf.keras.layers.Flatten(input_shape=mis),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
               loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
               metrics=['accuracy'])

    model.fit(x_train, y_train, epochs=5)

    for mod in models:
        model.save(mod)

def prediction(model_name):

    model=tf.keras.models.load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

if __name__ == "__main__":
    models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
    dir = os.listdir(".")
    if models[0] not in dir:
        createModels(models)
    # Shared array input
    ub = 100
    testShape = x_train[:ub].shape
    input_base = multiprocessing.Array(ctypes.c_double, 
    int(np.prod(testShape)),lock=False)
    input = np.ctypeslib.as_array(input_base)
    input = input.reshape(testShape)
    input[:ub] = x_train[:ub]

    # with multiprocessing.Pool() as p:  #Use me for python 3
    p = multiprocessing.Pool() #Use me for python 2.7
    start_time=time.time()
    res=p.map(prediction,models)
    p.close() #Use me for python 2.7
    print('Total time taken: {}'.format(time.time() - start_time))
    print(res)

我希望这会有所帮助.

这篇关于使用 python 的多处理并行化 keras 中的模型预测的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆