Tensorflow:在cpu上的多个线程中加载数据 [英] Tensorflow: Load data in multiple threads on cpu
问题描述
我有一个python类SceneGenerator
,它具有多个用于预处理的成员函数和一个生成器函数generate_data()
.基本结构是这样的:
I have a python class SceneGenerator
which has multiple member functions for preprocessing and a generator function generate_data()
. The basic structure is like this:
class SceneGenerator(object):
def __init__(self):
# some inits
def generate_data(self):
"""
Generator. Yield data X and labels y after some preprocessing
"""
while True:
# opening files, selecting data
X,y = self.preprocess(some_params, filenames, ...)
yield X, y
我在keras model.fit_generator()函数中使用了类成员函数sceneGenerator.generate_data()从磁盘读取数据,对其进行预处理并产生数据.在keras中,如果model.fit_generator()
的workers
参数设置为> 1,则会在多个CPU线程上完成此操作.
I used the class member function sceneGenerator.generate_data() in keras model.fit_generator() function to read the data from disk, preprocess it and yield it. In keras, this is done on multiple CPU threads, if the workers
parameter of model.fit_generator()
is set to something > 1.
我现在想在张量流中使用相同的SceneGenerator
类.我目前的做法是:
I now want to use the same SceneGenerator
class in tensorflow. My current approach is this:
sceneGenerator = SceneGenerator(some_params)
for X, y in sceneGenerator.generate_data():
feed_dict = {ops['data']: X,
ops['labels']: y,
ops['is_training_pl']: True
}
summary, step, _, loss, prediction = sess.run([optimization_op, loss_op, pred_op],
feed_dict=feed_dict)
但是,这很慢并且不使用多个线程.我发现 tf.data.Dataset
api和一些文档,但是我无法实现这些方法.
This, however, is slow and does not use multiple threads. I found the tf.data.Dataset
api with some documentation, but I fail to implement the methods.
编辑:请注意,我不使用图像,因此带有文件路径等的图像加载机制在这里不起作用.
我的SceneGenerator
从hdf5文件加载数据.但不是完整的数据集,而是-根据初始化参数-仅数据集的一部分.我希望保持生成器的功能不变,并了解如何将此生成器直接用作张量流的输入并在CPU的多个线程上运行.将数据从hdf5文件重写到csv并不是一个好选择,因为它会复制大量数据.
Notice that I do not work with images so that the image loading mechanisms with file paths etc. do not work here.
My SceneGenerator
loads data from hdf5 files. But not complete datasets but - depending on the initialization parameters - only parts of a dataset. I would love to keep the generator function as it is and learn how this generator can be directly used as input for tensorflow and runs on multiple threads on the CPU. Rewriting the data from the hdf5 files to csv is not a good option because it duplicated lots of data.
Edit 2:: I think something similar to this could help: parallelising tf.data.Dataset.from_generator
推荐答案
假设您使用的是最新的Tensorflow(在撰写本文时为1.4),则可以保留生成器并使用
Assuming you're using the latest Tensorflow (1.4 at the time of this writing), you can keep the generator and use the tf.data.*
API as follows (I chose arbitrary values for the thread number, prefetch buffer size, batch size and output data types):
NUM_THREADS = 5
sceneGen = SceneGenerator()
dataset = tf.data.Dataset.from_generator(sceneGen.generate_data, output_types=(tf.float32, tf.int32))
dataset = dataset.map(lambda x,y : (x,y), num_parallel_calls=NUM_THREADS).prefetch(buffer_size=1000)
dataset = dataset.batch(42)
X, y = dataset.make_one_shot_iterator().get_next()
为了显示实际上是从生成器中提取的多个线程,我对您的类进行了如下修改:
To show that it's actually multiple threads extracting from the generator, I modified your class as follows:
import threading
class SceneGenerator(object):
def __init__(self):
# some inits
pass
def generate_data(self):
"""
Generator. Yield data X and labels y after some preprocessing
"""
while True:
# opening files, selecting data
X,y = threading.get_ident(), 2 #self.preprocess(some_params, filenames, ...)
yield X, y
这样,创建一个Tensorflow会话并获得一个批处理将显示获取数据的线程的线程ID.在我的电脑上,运行:
This way, creating a Tensorflow session and getting one batch shows the thread IDs of the threads getting the data. On my pc, running:
sess = tf.Session()
print(sess.run([X, y]))
打印
[array([ 8460., 8460., 8460., 15912., 16200., 16200., 8460.,
15912., 16200., 8460., 15912., 16200., 16200., 8460.,
15912., 15912., 8460., 8460., 6552., 15912., 15912.,
8460., 8460., 15912., 9956., 16200., 9956., 16200.,
15912., 15912., 9956., 16200., 15912., 16200., 16200.,
16200., 6552., 16200., 16200., 9956., 6552., 6552.], dtype=float32),
array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])]
注意:您可能想尝试删除map
调用(我们只使用它来拥有多个线程),并检查prefetch
的缓冲区是否足以消除瓶颈在您的输入管道中(即使只有一个线程,输入预处理通常比实际图形执行更快,因此缓冲区足以使预处理尽可能快地进行.)
Note: You might want to experiment removing the map
call (that we only use to have the multiple threads) and checking if the prefetch
's buffer is enough to remove the bottleneck in your input pipeline (even with only one thread, often the input preprocessing is faster than the actual graph execution, so the buffer is enough to have the preprocessing go as fast as it can).
这篇关于Tensorflow:在cpu上的多个线程中加载数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!