如何*实际*读取 TensorFlow 中的 CSV 数据? [英] How to *actually* read CSV data in TensorFlow?
问题描述
我对 TensorFlow 的世界相对较新,并且对您如何实际上将 CSV 数据读入 TensorFlow 中可用的示例/标签张量感到非常困惑.TensorFlow 阅读 CSV 教程中的示例数据非常零散,只能帮助您完成 CSV 数据训练.
I'm relatively new to the world of TensorFlow, and pretty perplexed by how you'd actually read CSV data into a usable example/label tensors in TensorFlow. The example from the TensorFlow tutorial on reading CSV data is pretty fragmented and only gets you part of the way to being able to train on CSV data.
这是我根据 CSV 教程拼凑的代码:
Here's my code that I've pieced together, based off that CSV tutorial:
from __future__ import print_function
import tensorflow as tf
def file_len(fname):
with open(fname) as f:
for i, l in enumerate(f):
pass
return i + 1
filename = "csv_test_data.csv"
# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)
# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)
# turn features back into a tensor
features = tf.stack([col1,col2,col3,col4])
print("loading, " + str(file_length) + " line(s)
")
with tf.Session() as sess:
tf.initialize_all_variables().run()
# start populating filename queue
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, col5])
print(example, label)
coord.request_stop()
coord.join(threads)
print("
done loading")
这是我正在加载的 CSV 文件中的一个简短示例 - 非常基本的数据 - 4 个特征列和 1 个标签列:
And here is an brief example from the CSV file I'm loading - pretty basic data - 4 feature columns, and 1 label column:
0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0
上面的所有代码都是从 CSV 文件中一个一个地打印每个示例,这虽然很好,但对训练毫无用处.
All the code above does is print each example from the CSV file, one by one, which, while nice, is pretty darn useless for training.
我在这里苦苦挣扎的是,您实际上如何将那些一个接一个加载的单个示例转换为训练数据集.例如,这是一个笔记本我正在使用Udacity 深度学习课程.我基本上想获取我正在加载的 CSV 数据,并将其放入类似 train_dataset 和 train_labels 的内容中:
What I'm struggling with here is how you'd actually turn those individual examples, loaded one-by-one, into a training dataset. For example, here's a notebook I was working on in the Udacity Deep Learning course. I basically want to take the CSV data I'm loading, and plop it into something like train_dataset and train_labels:
def reformat(dataset, labels):
dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
# Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)
我试过像这样使用 tf.train.shuffle_batch
,但它莫名其妙地挂了:
I've tried using tf.train.shuffle_batch
, like this, but it just inexplicably hangs:
for i in range(file_length):
# retrieve a single instance
example, label = sess.run([features, colRelevant])
example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
print(example, label)
总而言之,以下是我的问题:
So to sum up, here are my questions:
- 我在这个过程中遗漏了什么?
- 对于如何正确构建输入管道,我似乎缺少一些关键的直觉.
- 必须知道要处理的行数感觉很不雅(上面的
for i in range(file_length)
代码行)
Yaroslav 一指出我可能在这里混淆了命令式和图构建部分,它就开始变得更加清晰.我能够汇总以下代码,我认为这更接近于从 CSV 训练模型时通常会做的事情(不包括任何模型训练代码):
As soon as Yaroslav pointed out that I was likely mixing up imperative and graph-construction parts here, it started to become clearer. I was able to pull together the following code, which I think is closer to what would typically done when training a model from CSV (excluding any model training code):
from __future__ import print_function import numpy as np import tensorflow as tf import math as math import argparse parser = argparse.ArgumentParser() parser.add_argument('dataset') args = parser.parse_args() def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 def read_from_csv(filename_queue): reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) record_defaults = [[0],[0],[0],[0],[0]] colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults) features = tf.stack([colHour,colQuarter,colAction,colUser]) label = tf.stack([colLabel]) return features, label def input_pipeline(batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True) example, label = read_from_csv(filename_queue) min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size example_batch, label_batch = tf.train.shuffle_batch( [example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch file_length = file_len(args.dataset) - 1 examples, labels = input_pipeline(file_length, 1) with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) try: while not coord.should_stop(): example_batch, label_batch = sess.run([examples, labels]) print(example_batch) except tf.errors.OutOfRangeError: print('Done training, epoch reached') finally: coord.request_stop() coord.join(threads)
推荐答案
我认为您在这里混淆了命令式和图形构建部分.
tf.train.shuffle_batch
操作创建了一个新的队列节点,可以使用单个节点处理整个数据集.所以我认为你挂了是因为你在 for 循环中创建了一堆shuffle_batch
队列并且没有为它们启动队列运行器.I think you are mixing up imperative and graph-construction parts here. The operation
tf.train.shuffle_batch
creates a new queue node, and a single node can be used to process the entire dataset. So I think you are hanging because you created a bunch ofshuffle_batch
queues in your for loop and didn't start queue runners for them.正常的输入管道用法如下所示:
Normal input pipeline usage looks like this:
- 将
shuffle_batch
等节点添加到输入管道 - (可选,以防止无意的图形修改)完成图形
---图构建结束,命令式编程开始--
--- end of graph construction, beginning of imperative programming --
tf.start_queue_runners
while(True): session.run()
为了更具可扩展性(避免 Python GIL),您可以使用 TensorFlow 管道生成所有数据.但是,如果性能不重要,您可以使用
slice_input_producer 将 numpy 数组连接到输入管道.
下面是一个带有一些Print
节点的示例,以查看发生了什么(Print
中的消息在节点运行时转到标准输出)To be more scalable (to avoid Python GIL), you could generate all of your data using TensorFlow pipeline. However, if performance is not critical, you can hook up a numpy array to an input pipeline by using
slice_input_producer.
Here's an example with somePrint
nodes to see what's going on (messages inPrint
go to stdout when node is run)tf.reset_default_graph() num_examples = 5 num_features = 2 data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features)) print data (data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False) data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ") data_batch = tf.batch([data_node_debug], batch_size=2) data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ") sess = tf.InteractiveSession() sess.run(tf.initialize_all_variables()) tf.get_default_graph().finalize() tf.start_queue_runners() try: while True: print sess.run(data_batch_debug) except tf.errors.OutOfRangeError as e: print "No more inputs."
你应该看到这样的
[[0 1] [2 3] [4 5] [6 7] [8 9]] [[0 1] [2 3]] [[4 5] [6 7]] No more inputs.
8、9"号没有填满整批,所以它们没有被生产出来.此外,
tf.Print
被打印到 sys.stdout,所以它们分别显示在我的终端中.The "8, 9" numbers didn't fill up the full batch, so they didn't get produced. Also
tf.Print
are printed to sys.stdout, so they show up in separately in Terminal for me.PS:在github issue 2193<中将
batch
连接到手动初始化的队列的最小化/a>PS: a minimal of connecting
batch
to a manually initialized queue is in github issue 2193此外,出于调试目的,您可能希望在会话中设置
timeout
,以便您的 IPython 笔记本不会挂在空队列出队上.我在会话中使用此辅助函数Also, for debugging purposes you might want to set
timeout
on your session so that your IPython notebook doesn't hang on empty queue dequeues. I use this helper function for my sessionsdef create_session(): config = tf.ConfigProto(log_device_placement=True) config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM config.operation_timeout_in_ms=60000 # terminate on long hangs # create interactive session to register a default session sess = tf.InteractiveSession("", config=config) return sess
可扩展性说明:
tf.constant
将您的数据副本内联到图表中.Graph 定义的大小有 2GB 的基本限制,因此这是数据大小的上限- 您可以通过使用
v=tf.Variable
并通过使用tf.placeholder<运行
v.assign_op
将数据保存到那里来绕过该限制/code> 在右侧并将 numpy 数组提供给占位符 (feed_dict
) - 这仍然会创建两个数据副本,因此为了节省内存,您可以制作自己的
slice_input_producer
版本,该版本对 numpy 数组进行操作,并使用feed_dict一次上传一行代码>
tf.constant
inlines copy of your data into the Graph. There's a fundamental limit of 2GB on size of Graph definition so that's an upper limit on size of data- You could get around that limit by using
v=tf.Variable
and saving the data into there by runningv.assign_op
with atf.placeholder
on right-hand side and feeding numpy array to the placeholder (feed_dict
) - That still creates two copies of data, so to save memory you could make your own version of
slice_input_producer
which operates on numpy arrays, and uploads rows one at a time usingfeed_dict
这篇关于如何*实际*读取 TensorFlow 中的 CSV 数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!