用 tf.data 替换基于队列的输入管道 [英] Replacing Queue-based input pipelines with tf.data

查看:33
本文介绍了用 tf.data 替换基于队列的输入管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Tensorflow 阅读 Ganegedara 的 NLP.输入管道的介绍有以下例子

I am reading Ganegedara‘s NLP with Tensorflow. The introduction to input pipieline has the following example

import tensorflow as tf
import numpy as np
import os

# Defining the graph and session
graph = tf.Graph() # Creates a graph
session = tf.InteractiveSession(graph=graph) # Creates a session

# The filename queue
filenames = ['test%d.txt'%i for i in range(1,4)]
filename_queue = tf.train.string_input_producer(filenames, capacity=3, shuffle=True,name='string_input_producer')

# check if all files are there
for f in filenames:
    if not tf.gfile.Exists(f):
        raise ValueError('Failed to find file: ' + f)
    else:
        print('File %s found.'%f)

# Reader which takes a filename queue and 
# read() which outputs data one by one
reader = tf.TextLineReader()

# ready the data of the file and output as key,value pairs 
# We're discarding the key
key, value = reader.read(filename_queue, name='text_read_op')

# if any problems encountered with reading file 
# this is the value returned
record_defaults = [[-1.0], [-1.0], [-1.0], [-1.0], [-1.0], [-1.0], [-1.0], [-1.0], [-1.0], [-1.0]]

# decoding the read value to columns
col1, col2, col3, col4, col5, col6, col7, col8, col9, col10 = tf.decode_csv(value, record_defaults=record_defaults)
features = tf.stack([col1, col2, col3, col4, col5, col6, col7, col8, col9, col10])

# output x is randomly assigned a batch of data of batch_size 
# where the data is read from the txt files
x = tf.train.shuffle_batch([features], batch_size=3,
                           capacity=5, name='data_batch', 
                           min_after_dequeue=1,num_threads=1)

# QueueRunner retrieve data from queues and we need to explicitly start them
# Coordinator coordinates multiple QueueRunners
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord, sess=session)

# Executing operations and evaluating nodes in the graph
tf.global_variables_initializer().run() # Initialize the variables

# Calculate h with x and print the results for 5 steps
for step in range(5):
    x_eval = session.run(x) 
    print('========== Step %d =========='%step)
    print('Evaluated data (x)')
    print(x_eval)
    print('')

# We also need to explicitly stop the coordinator 
# otherwise the process will hang indefinitely
coord.request_stop()
coord.join(threads)
session.close()

输出如下:

========== Step 0 ==========
Evaluated data (x)
[[0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]
 [0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]
 [0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]]

========== Step 1 ==========
Evaluated data (x)
[[1.  0.9 0.8 0.7 0.6 0.5 0.4 0.3 0.2 0.1]
 [1.  0.9 0.8 0.7 0.6 0.5 0.4 0.3 0.2 0.1]
 [0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]]

========== Step 2 ==========
Evaluated data (x)
[[0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]
 [1.  0.9 0.8 0.7 0.6 0.5 0.4 0.3 0.2 0.1]
 [1.  0.9 0.8 0.7 0.6 0.5 0.4 0.3 0.2 0.1]]

========== Step 3 ==========
Evaluated data (x)
[[0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]
 [0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]
 [0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]]

========== Step 4 ==========
Evaluated data (x)
[[0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9 1. ]
 [1.  0.9 0.8 0.7 0.6 0.5 0.4 0.3 0.2 0.1]
 [0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1]]

它还生成大量关于基于队列的输入管道被弃用的警告,并建议改用 tf.data 模块.

It also generates a lot of warning about Queue-based input pipelines being deprecated and suggests using the tf.data module instead.

这是我尝试使用 tf.data 模块

This is my attempt to using tf.data module

import tensorflow as tf
import numpy as np
import os

graph = tf.Graph()
session = tf.InteractiveSession(graph=graph)
filenames = ['test%d.txt'%i for i in range(1,4)]
record_defaults = [[-1.0]] * 10
features = tf.data.experimental.CsvDataset(filenames, record_defaults).batch(batch_size=3).shuffle(buffer_size=5)
x = features.make_one_shot_iterator().get_next()
x = tf.convert_to_tensor(x)
# Executing operations and evaluating nodes in the graph
tf.global_variables_initializer().run() # Initialize the variables
# Calculate h with x and print the results for 5 steps
for step in range(5):
    x_eval = session.run(x)
    print('========== Step %d =========='%step)
    print('Evaluated data (x)')
    print(x_eval)
    print('')
session.close()

产生这个输出:

========== Step 0 ==========
Evaluated data (x)
[[0.1 0.1 0.1]
 [0.2 0.2 0.2]
 [0.3 0.3 0.3]
 [0.4 0.4 0.4]
 [0.5 0.5 0.5]
 [0.6 0.6 0.6]
 [0.7 0.7 0.7]
 [0.8 0.8 0.8]
 [0.9 0.9 0.9]
 [1.  1.  1. ]]

========== Step 1 ==========
Evaluated data (x)
[[0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]
 [0.1 0.1 0.1]]

========== Step 2 ==========
Evaluated data (x)
[[1.  1.  1. ]
 [0.9 0.9 0.9]
 [0.8 0.8 0.8]
 [0.7 0.7 0.7]
 [0.6 0.6 0.6]
 [0.5 0.5 0.5]
 [0.4 0.4 0.4]
 [0.3 0.3 0.3]
 [0.2 0.2 0.2]
 [0.1 0.1 0.1]]

========== Step 3 ==========
Evaluated data (x)
[[0.1 0.1 0.1]
 [0.2 0.2 0.1]
 [0.3 0.3 0.1]
 [0.4 0.4 0.1]
 [0.5 0.5 0.1]
 [0.6 0.6 0.1]
 [0.7 0.7 0.1]
 [0.8 0.8 0.1]
 [0.9 0.9 0.1]
 [1.  1.  0.1]]

========== Step 4 ==========
Evaluated data (x)
[[0.1 1.  1. ]
 [0.1 0.9 0.9]
 [0.1 0.8 0.8]
 [0.1 0.7 0.7]
 [0.1 0.6 0.6]
 [0.1 0.5 0.5]
 [0.1 0.4 0.4]
 [0.1 0.3 0.3]
 [0.1 0.2 0.2]
 [0.1 0.1 0.1]]

看起来原始代码每次采样 3 行,而我尝试使用 tf.data 采样 3 列.这是为什么,我该如何修复我的代码并使其与原始代码等效?

It looks like the original code samples 3 rows everytime, and my attempt with tf.data samples 3 columns. Why is this and how can I fix my code and make it equivalent to the original?

推荐答案

我最终通过别人的代码找到了我的答案,这是 查询TextLineDataset和decode_csv性能不佳.

I ended up finding my answer through someone else's code, which was inquiring about the poor performance of TextLineDataset and decode_csv.

这是我使用 tf.data 执行类似于 Ganegedara 书中的代码的代码:

Here's my code that uses tf.data to do something similar to the code on Ganegedara‘s book:

import tensorflow as tf
import numpy as np
import os

graph = tf.Graph()
session = tf.InteractiveSession(graph=graph)
filenames = ['test%d.txt'%i for i in range(1,4)]

record_defaults = [[-1.0]] * 10

features = tf.data.TextLineDataset(filenames=filenames)

def parse_csv(line):
        cols_types = [[-1.0]] * 10  # all required
        columns = tf.decode_csv(line, record_defaults=cols_types)
        return tf.stack(columns)

features = features.map(parse_csv).batch(batch_size=3).shuffle(buffer_size=5)

x = features.make_one_shot_iterator().get_next()
x = tf.convert_to_tensor(x)
W = tf.Variable(tf.random_uniform(shape=[10,5], minval=-0.1,maxval=0.1, dtype=tf.float32),name='W') 
b = tf.Variable(tf.zeros(shape=[5],dtype=tf.float32),name='b')
h = tf.nn.sigmoid(tf.matmul(x,W) + b) # Operation to be performed

tf.global_variables_initializer().run() # Initialize the variables

# Calculate h with x and print the results for 5 steps
for step in range(5):
    x_eval, h_eval = session.run([x,h]) 
    print('========== Step %d =========='%step)
    print('Evaluated data (x)')
    print(x_eval)
    print('Evaluated data (h)')
    print(h_eval)
    print('')
session.close()

这篇关于用 tf.data 替换基于队列的输入管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆