Keras Tensorflow-从多个线程进行预测时发生异常 [英] Keras Tensorflow - Exception while predicting from multiple threads
问题描述
我正在使用带有tensorflow 1.3.0后端的keras 2.0.8.
I am using keras 2.0.8 with tensorflow 1.3.0 backend.
我正在将类加载到init类中,然后将其用于预测多线程.
I am loading a model in the class init and then use it to predict multithreaded.
import tensorflow as tf
from keras import backend as K
from keras.models import load_model
class CNN:
def __init__(self, model_path):
self.cnn_model = load_model(model_path)
self.session = K.get_session()
self.graph = tf.get_default_graph()
def query_cnn(self, data):
X = self.preproccesing(data)
with self.session.as_default():
with self.graph.as_default():
return self.cnn_model.predict(X)
我一次初始化了CNN,query_cnn方法是从多个线程发生的.
I initialize the CNN once and the query_cnn method happens from multiple threads.
我在日志中遇到的异常是:
The exception i get in my log is:
File "/home/*/Similarity/CNN.py", line 43, in query_cnn
return self.cnn_model.predict(X)
File "/usr/local/lib/python3.5/dist-packages/keras/models.py", line 913, in predict
return self.model.predict(x, batch_size=batch_size, verbose=verbose)
File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1713, in predict
verbose=verbose, steps=steps)
File "/usr/local/lib/python3.5/dist-packages/keras/engine/training.py", line 1269, in _predict_loop
batch_outs = f(ins_batch)
File "/usr/local/lib/python3.5/dist-packages/keras/backend/tensorflow_backend.py", line 2273, in __call__
**self.session_kwargs)
File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 895, in run
run_metadata_ptr)
File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1124, in _run
feed_dict_tensor, options, run_metadata)
File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1321, in _do_run
options, run_metadata)
File "/usr/local/lib/python3.5/dist-packages/tensorflow/python/client/session.py", line 1340, in _do_call
raise type(e)(node_def, op, message)
tensorflow.python.framework.errors_impl.NotFoundError: PruneForTargets: Some target nodes not found: group_deps
该代码在大多数情况下都可以正常工作,这可能与多线程有关.
The code works fine most of the times, its probably some problem with the multithreading.
我该如何解决?
推荐答案
确保在创建其他线程之前完成图形的创建.
Make sure you finish the graph creation before creating the other threads.
在图形上调用finalize()
可能会帮助您.
Calling finalize()
on the graph may help you with that.
def __init__(self, model_path):
self.cnn_model = load_model(model_path)
self.session = K.get_session()
self.graph = tf.get_default_graph()
self.graph.finalize()
更新1: finalize()
将使您的图形为只读,因此可以安全地在多个线程中使用它.副作用是,它会帮助您发现意外行为,有时还会导致内存泄漏,因为当您尝试修改图形时会引发异常.
Update 1: finalize()
will make your graph read-only so it can be safely used in multiple threads. As a side effect, it will help you find unintentional behavior and sometimes memory leaks as it will throw an exception when you try to modify the graph.
想象一下,您有一个线程对输入进行了一种热编码. (不好的例子:)
Imagine that you have a thread that does for instance one hot encoding of your inputs. (bad example:)
def preprocessing(self, data):
one_hot_data = tf.one_hot(data, depth=self.num_classes)
return self.session.run(one_hot_data)
如果您在图表中打印对象的数量,您会注意到它会随着时间的推移而增加
If you print the amount of objects in the graph you will notice that it will increase over time
# amount of nodes in tf graph
print(len(list(tf.get_default_graph().as_graph_def().node)))
但是,如果您首先定义图形,则情况并非如此(稍好一些的代码):
But if you define the graph first that won't be the case (slightly better code):
def preprocessing(self, data):
# run pre-created operation with self.input as placeholder
return self.session.run(self.one_hot_data, feed_dict={self.input: data})
更新2:根据此线程您需要在进行多线程处理之前在keras模型上调用model._make_predict_function()
.
Update 2: According to this thread you need to call model._make_predict_function()
on a keras model before doing multithreading.
Keras首次调用预报()时会构建GPU函数.那 方式,如果您从不调用预测",则可以节省一些时间和资源. 但是,您第一次调用预测"的速度比每次调用都要慢 其他时间.
Keras builds the GPU function the first time you call predict(). That way, if you never call predict, you save some time and resources. However, the first time you call predict is slightly slower than every other time.
更新的代码:
def __init__(self, model_path):
self.cnn_model = load_model(model_path)
self.cnn_model._make_predict_function() # have to initialize before threading
self.session = K.get_session()
self.graph = tf.get_default_graph()
self.graph.finalize() # make graph read-only
更新3:我做了预热的概念证明,因为_make_predict_function()
似乎没有按预期工作.
首先,我创建了一个虚拟模型:
Update 3: I did a proof of concept of a warming up, because _make_predict_function()
doesn't seems to work as expected.
First I created a dummy model:
import tensorflow as tf
from keras.layers import *
from keras.models import *
model = Sequential()
model.add(Dense(256, input_shape=(2,)))
model.add(Dense(1, activation='softmax'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.save("dummymodel")
然后在另一个脚本中,我加载了该模型并使其在多个线程上运行
Then in another script I loaded that model and made it run on multiple threads
import tensorflow as tf
from keras import backend as K
from keras.models import load_model
import threading as t
import numpy as np
K.clear_session()
class CNN:
def __init__(self, model_path):
self.cnn_model = load_model(model_path)
self.cnn_model.predict(np.array([[0,0]])) # warmup
self.session = K.get_session()
self.graph = tf.get_default_graph()
self.graph.finalize() # finalize
def preproccesing(self, data):
# dummy
return data
def query_cnn(self, data):
X = self.preproccesing(data)
with self.session.as_default():
with self.graph.as_default():
prediction = self.cnn_model.predict(X)
print(prediction)
return prediction
cnn = CNN("dummymodel")
th = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th2 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th3 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th4 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th5 = t.Thread(target=cnn.query_cnn, kwargs={"data": np.random.random((500, 2))})
th.start()
th2.start()
th3.start()
th4.start()
th5.start()
th2.join()
th.join()
th3.join()
th5.join()
th4.join()
评论热身路线并最终确定,我能够重现您的第一期
Commenting the lines for the warmingup and finalize I was able to reproduce your first issue
这篇关于Keras Tensorflow-从多个线程进行预测时发生异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!