Tensorflow:在cpu上的多个线程中加载数据 [英] Tensorflow: Load data in multiple threads on cpu

查看:294
本文介绍了Tensorflow:在cpu上的多个线程中加载数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个python类SceneGenerator,它具有多个用于预处理的成员函数和一个生成器函数generate_data().基本结构是这样的:

I have a python class SceneGenerator which has multiple member functions for preprocessing and a generator function generate_data(). The basic structure is like this:

class SceneGenerator(object):
    def __init__(self):
       # some inits

    def generate_data(self):
        """
        Generator. Yield data X and labels y after some preprocessing
        """
        while True:
            # opening files, selecting data
            X,y = self.preprocess(some_params, filenames, ...)            

            yield X, y

我在keras model.fit_generator()函数中使用了类成员函数sceneGenerator.generate_data()从磁盘读取数据,对其进行预处理并产生数据.在keras中,如果model.fit_generator()workers参数设置为> 1,则会在多个CPU线程上完成此操作.

I used the class member function sceneGenerator.generate_data() in keras model.fit_generator() function to read the data from disk, preprocess it and yield it. In keras, this is done on multiple CPU threads, if the workers parameter of model.fit_generator() is set to something > 1.

我现在想在张量流中使用相同的SceneGenerator类.我目前的做法是:

I now want to use the same SceneGenerator class in tensorflow. My current approach is this:

sceneGenerator = SceneGenerator(some_params)
for X, y in sceneGenerator.generate_data():

    feed_dict = {ops['data']: X,
                 ops['labels']: y,
                 ops['is_training_pl']: True
                 }
    summary, step, _, loss, prediction = sess.run([optimization_op, loss_op, pred_op],
                                                  feed_dict=feed_dict)

但是,这很慢并且不使用多个线程.我发现 tf.data.Dataset api和一些文档,但是我无法实现这些方法.

This, however, is slow and does not use multiple threads. I found the tf.data.Dataset api with some documentation, but I fail to implement the methods.

编辑:请注意,我不使用图像,因此带有文件路径等的图像加载机制在这里不起作用. 我的SceneGenerator从hdf5文件加载数据.但不是完整的数据集,而是-根据初始化参数-仅数据集的一部分.我希望保持生成器的功能不变,并了解如何将此生成器直接用作张量流的输入并在CPU的多个线程上运行.将数据从hdf5文件重写到csv并不是一个好选择,因为它会复制大量数据.

Notice that I do not work with images so that the image loading mechanisms with file paths etc. do not work here. My SceneGenerator loads data from hdf5 files. But not complete datasets but - depending on the initialization parameters - only parts of a dataset. I would love to keep the generator function as it is and learn how this generator can be directly used as input for tensorflow and runs on multiple threads on the CPU. Rewriting the data from the hdf5 files to csv is not a good option because it duplicated lots of data.

:我认为与此类似的方法可能会有所帮助:

Edit 2:: I think something similar to this could help: parallelising tf.data.Dataset.from_generator

推荐答案

假设您使用的是最新的Tensorflow(在撰写本文时为1.4),则可以保留生成器并使用

Assuming you're using the latest Tensorflow (1.4 at the time of this writing), you can keep the generator and use the tf.data.* API as follows (I chose arbitrary values for the thread number, prefetch buffer size, batch size and output data types):

NUM_THREADS = 5
sceneGen = SceneGenerator()
dataset = tf.data.Dataset.from_generator(sceneGen.generate_data, output_types=(tf.float32, tf.int32))
dataset = dataset.map(lambda x,y : (x,y), num_parallel_calls=NUM_THREADS).prefetch(buffer_size=1000)
dataset = dataset.batch(42)
X, y = dataset.make_one_shot_iterator().get_next()

为了显示实际上是从生成器中提取的多个线程,我对您的类进行了如下修改:

To show that it's actually multiple threads extracting from the generator, I modified your class as follows:

import threading    
class SceneGenerator(object):
  def __init__(self):
    # some inits
    pass

  def generate_data(self):
    """
    Generator. Yield data X and labels y after some preprocessing
    """
    while True:
      # opening files, selecting data
      X,y = threading.get_ident(), 2 #self.preprocess(some_params, filenames, ...)            
      yield X, y

这样,创建一个Tensorflow会话并获得一个批处理将显示获取数据的线程的线程ID.在我的电脑上,运行:

This way, creating a Tensorflow session and getting one batch shows the thread IDs of the threads getting the data. On my pc, running:

sess = tf.Session()
print(sess.run([X, y]))

打印

[array([  8460.,   8460.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  16200.,   8460.,  15912.,  16200.,  16200.,   8460.,
         15912.,  15912.,   8460.,   8460.,   6552.,  15912.,  15912.,
          8460.,   8460.,  15912.,   9956.,  16200.,   9956.,  16200.,
         15912.,  15912.,   9956.,  16200.,  15912.,  16200.,  16200.,
         16200.,   6552.,  16200.,  16200.,   9956.,   6552.,   6552.], dtype=float32),
 array([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
        2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])]

注意:您可能想尝试删除map调用(我们只使用它来拥有多个线程),并检查prefetch的缓冲区是否足以消除瓶颈在您的输入管道中(即使只有一个线程,输入预处理通常比实际图形执行更快,因此缓冲区足以使预处理尽可能快地进行.)

Note: You might want to experiment removing the map call (that we only use to have the multiple threads) and checking if the prefetch's buffer is enough to remove the bottleneck in your input pipeline (even with only one thread, often the input preprocessing is faster than the actual graph execution, so the buffer is enough to have the preprocessing go as fast as it can).

这篇关于Tensorflow:在cpu上的多个线程中加载数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆