如何在 tensorflow 中使用自定义 python 函数预取数据 [英] How to prefetch data using a custom python function in tensorflow

查看:34
本文介绍了如何在 tensorflow 中使用自定义 python 函数预取数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试预取训练数据以隐藏 I/O 延迟.我想编写从磁盘加载数据并预处理数据的自定义 Python 代码(例如,通过添加上下文窗口).换句话说,一个线程进行数据预处理,另一个线程进行训练.这在 TensorFlow 中可行吗?

I am trying to prefetch training data to hide I/O latency. I would like to write custom Python code that loads data from disk and preprocesses the data (e.g. by adding a context window). In other words, one thread does data preprocessing and the other does training. Is this possible in TensorFlow?

更新:我有一个基于@mrry 示例的工作示例.

Update: I have a working example based on @mrry's example.

import numpy as np
import tensorflow as tf
import threading

BATCH_SIZE = 5
TRAINING_ITERS = 4100

feature_input = tf.placeholder(tf.float32, shape=[128])
label_input = tf.placeholder(tf.float32, shape=[128])

q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]])
enqueue_op = q.enqueue([label_input, feature_input])

label_batch, feature_batch = q.dequeue_many(BATCH_SIZE)
c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128])

sess = tf.Session()

def load_and_enqueue(sess, enqueue_op, coord):
  with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file:
    while not coord.should_stop():
      feature_array = np.fromfile(feature_file, np.float32, 128)
      if feature_array.shape[0] == 0:
        print('reach end of file, reset using seek(0,0)')
        feature_file.seek(0,0)
        label_file.seek(0,0)
        continue
      label_value = np.fromfile(label_file, np.float32, 128)

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

coord = tf.train.Coordinator()
t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord))
t.start()

for i in range(TRAINING_ITERS):
  sum = sess.run(c)
  print('train_iter='+str(i))
  print(sum)

coord.request_stop()
coord.join([t])

推荐答案

这是一个常见的用例,大多数实现使用 TensorFlow 的队列来将预处理代码与训练代码分离.有关于如何使用队列的教程,但主要的步骤如下:

This is a common use case, and most implementations use TensorFlow's queues to decouple the preprocessing code from the training code. There is a tutorial on how to use queues, but the main steps are as follows:

  1. 定义一个队列,q,它将缓冲预处理的数据.TensorFlow 支持简单的tf.FIFOQueue 按照元素入队的顺序生成元素,以及更高级的 tf.RandomShuffleQueue 以随机顺序生成元素.队列元素是一个或多个张量(可以有不同的类型和形状)的元组.所有队列都支持单元素(enqueuedequeue)和批量(enqueue_manydequeue_many)操作,但要使用批处理操作时,您必须在构造队列时指定队列元素中每个张量的形状.

  1. Define a queue, q, that will buffer the preprocessed data. TensorFlow supports the simple tf.FIFOQueue that produces elements in the order they were enqueued, and the more advanced tf.RandomShuffleQueue that produces elements in a random order. A queue element is a tuple of one or more tensors (which can have different types and shapes). All queues support single-element (enqueue, dequeue) and batch (enqueue_many, dequeue_many) operations, but to use the batch operations you must specify the shapes of each tensor in a queue element when constructing the queue.

构建一个子图,将预处理过的元素排入队列.一种方法是定义一些 tf.placeholder() 对应于单个输入示例的张量的操作,然后将它们传递给 q.enqueue().(如果您的预处理一次生成一个批次,您应该使用 q.enqueue_many() 代替.)您还可以在此子图中包含 TensorFlow 操作.

Build a subgraph that enqueues preprocessed elements into the queue. One way to do this would be to define some tf.placeholder() ops for tensors corresponding to a single input example, then pass them to q.enqueue(). (If your preprocessing produces a batch at once, you should use q.enqueue_many() instead.) You might also include TensorFlow ops in this subgraph.

构建一个执行训练的子图.这看起来像一个常规的 TensorFlow 图,但会通过调用 q.dequeue_many(BATCH_SIZE).

Build a subgraph that performs training. This will look like a regular TensorFlow graph, but will get its input by calling q.dequeue_many(BATCH_SIZE).

开始您的会话.

创建一个或多个线程来执行您的预处理逻辑,然后执行入队操作,输入预处理数据.您可以找到 tf.train.Coordinatortf.train.QueueRunner 实用程序类对此很有用.

Create one or more threads that execute your preprocessing logic, then execute the enqueue op, feeding in the preprocessed data. You may find the tf.train.Coordinator and tf.train.QueueRunner utility classes useful for this.

照常运行您的训练图(优化器等).

Run your training graph (optimizer, etc.) as normal.

这是一个简单的 load_and_enqueue() 函数和代码片段,可帮助您入门:

Here's a simple load_and_enqueue() function and code fragment to get you started:

# Features are length-100 vectors of floats
feature_input = tf.placeholder(tf.float32, shape=[100])
# Labels are scalar integers.
label_input = tf.placeholder(tf.int32, shape=[])

# Alternatively, could do:
# feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100])
# label_batch_input = tf.placeholder(tf.int32, shape=[None])

q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []])
enqueue_op = q.enqueue([feature_input, label_input])

# For batch input, do:
# enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input])

feature_batch, label_batch = q.dequeue_many(BATCH_SIZE)
# Build rest of model taking label_batch, feature_batch as input.
# [...]
train_op = ...

sess = tf.Session()

def load_and_enqueue():
  with open(...) as feature_file, open(...) as label_file:
    while True:
      feature_array = numpy.fromfile(feature_file, numpy.float32, 100)
      if not feature_array:
        return
      label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0]

      sess.run(enqueue_op, feed_dict={feature_input: feature_array,
                                      label_input: label_value})

# Start a thread to enqueue data asynchronously, and hide I/O latency.
t = threading.Thread(target=load_and_enqueue)
t.start()

for _ in range(TRAINING_EPOCHS):
  sess.run(train_op)

这篇关于如何在 tensorflow 中使用自定义 python 函数预取数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆