如何*实际*读取TensorFlow中的CSV数据? [英] How to *actually* read CSV data in TensorFlow?

查看:2678
本文介绍了如何*实际*读取TensorFlow中的CSV数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对TensorFlow的世界比较陌生,对于如何实际 将CSV数据读入TensorFlow中的可用示例/标签张量非常困惑。 TensorFlow教程阅读CSV数据中的示例是非常分散的,只有你能够训练CSV数据的一部分。



这是我拼接在一起的代码,基于CSV教程:

 从__future__ import print_function 
导入张量流为tf

def file_len(fname) :
with open(fname)as f:
for i,l in enumerate(f):
pass
return i + 1

filename = csv_test_data.csv

#设置文本阅读器
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf .TextLineReader(skip_header_lines = 1)
_,csv_row = reader.read(filename_queue)

#设置CSV解码
record_defaults = [[0],[0],[0 ],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row,record_defaults = record_defaults)

# b $ b features = tf.pack([col1,col2,col3,col4])

print(loading+ str(file_length)+line(s)\\\

与tf.Session()as sess:
tf.initialize_all_variables()。run()

#开始填充文件名队列
coord = tf.train.Coordinator )
threads = tf.train.start_queue_runners(coord = coord)

for i in range(file_length):
#检索单个实例
example,label = sess.run([features,col5])
print(example,label)

coord.request_stop()
coord.join \\\
done loading)

这里是一个简单的例子从我加载的CSV文件 - 漂亮的基本数据 - 4个功能列和1个标签列:

  0,0,0,0,0 
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

上述所有代码都是逐个打印CSV文件中的每个示例,虽然很好,但对于培训。



我在这里遇到的困难是如何将这些单独的例子逐个加载到训练数据集中。例如,我在Udacity深度学习课程中工作的这是笔记本 。我基本上想要获取我加载的CSV数据,并将其拖入 train_dataset train_labels

  def重新格式化(数据集,标签):
dataset = dataset.reshape(( - 1,image_size * image_size))。astype(np.float32)
# Map 2 to [0.0,1.0,0.0 ...],3 to [0.0,0.0,1.0 ...]
labels =(np.arange(num_labels)== labels [:,None])。 (np.float32)
返回数据集,标签
train_dataset,train_labels = reformat(train_dataset,train_labels)
valid_dataset,valid_labels = reformat(valid_dataset,valid_labels)
test_dataset,test_labels = reformat (test_dataset,test_labels)
print('Training set',train_dataset.shape,train_labels.shape)
print('Validation set',valid_dataset.shape,valid_labels.shape)
print测试集',test_dataset.shape,test_labels.shape)

我试过使用 tf.train.shuffle_batch ,像这样,但它只是莫名其妙地挂起:

  for i在范围(file_length):
#检索单个实例
示例,label = sess.run([features,colRelevant])
example_batch,label_batch = tf.train.shuffle_batch label],batch_size = file_length,capacity = file_length,min_after_dequeue = 10000)
print(example,label)


$ b b

总而言之,这里是我的问题:




  • >


    • 感觉就像有一些关键的直觉,我不知道如何正确构建输入管道。


  • 有没有办法避免不必要知道CSV文件的长度?


    • 感觉不知道你想要处理的行数( for i in range(file_length)上面的代码行) li>






/ strong>
一旦雅罗斯拉夫指出我在这里混淆了命令式和图形构建部分,它就变得更清晰了。我能够将以下代码拉到一起,我认为这更接近于从CSV(不包括任何模型训练代码)训练模型时通常做的:

  from __future__ import print_function 
import numpy as np
import tensorflow as tf
import math as math
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()

def file_len(fname):
with open (fname)as f:
for i,l in enumerate(f):
pass
return i + 1

def read_from_csv(filename_queue):
reader = tf.TextLineReader(skip_header_lines = 1)
_,csv_row = reader.read(filename_queue)
record_defaults = [[0],[0],[0],[0] ]]
colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row,record_defaults = record_defaults)
features = tf.pack([colHour,colQuarter,colAction,colUser])
label = tf.pack([colLabel])
返回特性,标签

def input_pipeline(batch_size,num_epochs = None):
filename_queue = tf.train.string_input_producer .dataset],num_epochs = num_epochs,shuffle = True)
示例,label = read_from_csv(filename_queue)
min_after_dequeue = 10000
capacity = min_after_dequeue + 3 * batch_size
example_batch,label_batch = tf.train.shuffle_batch(
[example,label],batch_size = batch_size,capacity = capacity,
min_after_dequeue = min_after_dequeue)
return example_batch,label_batch

file_length = file_len(args.dataset) - 1
示例,labels = input_pipeline(file_length,1)

使用tf.Session()as sess:
tf.initialize_all_variables ()

#开始填充文件名队列
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord = coord)

try:
while not coord.should_stop():
example_batch,label_batch = sess.run([examples,labels])
print(example_batch)
except tf.errors .OutOfRangeError:
print('Done training,epoch reached')
finally:
coord.request_stop()

coord.join(threads)

我认为你在这里混淆了命令式和图形构建部分。

操作 tf.train.shuffle_batch 创建一个新的队列节点,单个节点可用于处理整个数据集。所以我想你挂起,因为你在你的for循环中创建了一堆 shuffle_batch 队列,并没有为他们启动队列跑者。



正常输入管道使用方式如下:


  1. 添加节点 shuffle_batch 到输入管道

  2. (可选,防止无意的图表修改)finalize graph

---图形构造的结束,命令式编程的开始 -


  1. tf.start_queue_runners

  2. while(True):session.run()

为了更具可扩展性(避免使用Python GIL),您可以使用TensorFlow管道生成所有数据。但是,如果性能不重要,可以通过使用 slice_input_producer将numpy数组连接到输入管道。这里是一个 Print 节点查看发生了什么(打印中的消息在节点运行时转到stdout)

  tf.reset_default_graph()

num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples * num_features ),(num_examples,num_features))
打印数据

(data_node,)= tf.slice_input_producer([tf.constant(data)],num_epochs = 1,shuffle = False)
data_node_debug = tf.Print(data_node,[data_node],Dequeueing from data_node)
data_batch = tf.batch([data_node_debug],batch_size = 2)
data_batch_debug = tf.Print(data_batch, [data_batch],Dequeueing from data_batch)

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph ()
tf.start_queue_runners()

try:
while True:
print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
print没有更多的输入。

您应该会看到类似的结果。

  [[0 1] 
[2 3]
[4 5]
[6 7]
[8 9]]
[[0 1]
[2 3]]
[[4 5]
[6 7]]
无更多输入。

8,9数字没有填满整个批次,因此他们没有得到生产。 tf.Print 打印到sys.stdout,所以它们分别显示在Terminal中。



PS:最少将批处理连接到手动初始化的队列位于 github中问题2193



此外,为了进行调试,您可能希望在会话中设置 timeout ,以便你的IPython笔记本不会挂在空队列出队。我使用这个帮助函数进行会话

  def create_session():
config = tf.ConfigProto(log_device_placement = True )
config.gpu_options.per_process_gpu_memory_fraction = 0.3#不占用所有vRAM
config.operation_timeout_in_ms = 60000#长时间挂起
#创建交互式会话以注册默认会话
sess = tf.InteractiveSession(,config = config)
return sess


  1. tf.constant 将您的数据复制到图表中。在图形定义的大小上有一个基本的限制,这是数据大小的上限。

  2. 你可以通过使用 v = tf.Variable ,并使用 tf.placeholder 运行 v.assign_op 右侧,并将numpy数组添加到占位符( feed_dict

  3. 仍然会创建两个数据副本,你可以使用你自己的版本的 slice_input_producer 来操作numpy数组,并使用 feed_dict / li>


I'm relatively new to the world of TensorFlow, and pretty perplexed by how you'd actually read CSV data into a usable example/label tensors in TensorFlow. The example from the TensorFlow tutorial on reading CSV data is pretty fragmented and only gets you part of the way to being able to train on CSV data.

Here's my code that I've pieced together, based off that CSV tutorial:

from __future__ import print_function
import tensorflow as tf

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

filename = "csv_test_data.csv"

# setup text reader
file_length = file_len(filename)
filename_queue = tf.train.string_input_producer([filename])
reader = tf.TextLineReader(skip_header_lines=1)
_, csv_row = reader.read(filename_queue)

# setup CSV decoding
record_defaults = [[0],[0],[0],[0],[0]]
col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults)

# turn features back into a tensor
features = tf.pack([col1,col2,col3,col4])

print("loading, " + str(file_length) + " line(s)\n")
with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, col5])
    print(example, label)

  coord.request_stop()
  coord.join(threads)
  print("\ndone loading")

And here is an brief example from the CSV file I'm loading - pretty basic data - 4 feature columns, and 1 label column:

0,0,0,0,0
0,15,0,0,0
0,30,0,0,0
0,45,0,0,0

All the code above does is print each example from the CSV file, one by one, which, while nice, is pretty darn useless for training.

What I'm struggling with here is how you'd actually turn those individual examples, loaded one-by-one, into a training dataset. For example, here's a notebook I was working on in the Udacity Deep Learning course. I basically want to take the CSV data I'm loading, and plop it into something like train_dataset and train_labels:

def reformat(dataset, labels):
  dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32)
  # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...]
  labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32)
  return dataset, labels
train_dataset, train_labels = reformat(train_dataset, train_labels)
valid_dataset, valid_labels = reformat(valid_dataset, valid_labels)
test_dataset, test_labels = reformat(test_dataset, test_labels)
print('Training set', train_dataset.shape, train_labels.shape)
print('Validation set', valid_dataset.shape, valid_labels.shape)
print('Test set', test_dataset.shape, test_labels.shape)

I've tried using tf.train.shuffle_batch, like this, but it just inexplicably hangs:

  for i in range(file_length):
    # retrieve a single instance
    example, label = sess.run([features, colRelevant])
    example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000)
    print(example, label)

So to sum up, here are my questions:

  • What am I missing about this process?
    • It feels like there is some key intuition that I'm missing about how to properly build an input pipeline.
  • Is there a way to avoid having to know the length of the CSV file?
    • It feels pretty inelegant to have to know the number of lines you want to process (the for i in range(file_length) line of code above)

Edit: As soon as Yaroslav pointed out that I was likely mixing up imperative and graph-construction parts here, it started to become clearer. I was able to pull together the following code, which I think is closer to what would typically done when training a model from CSV (excluding any model training code):

from __future__ import print_function
import numpy as np
import tensorflow as tf
import math as math
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('dataset')
args = parser.parse_args()

def file_len(fname):
    with open(fname) as f:
        for i, l in enumerate(f):
            pass
    return i + 1

def read_from_csv(filename_queue):
  reader = tf.TextLineReader(skip_header_lines=1)
  _, csv_row = reader.read(filename_queue)
  record_defaults = [[0],[0],[0],[0],[0]]
  colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults)
  features = tf.pack([colHour,colQuarter,colAction,colUser])  
  label = tf.pack([colLabel])  
  return features, label

def input_pipeline(batch_size, num_epochs=None):
  filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True)  
  example, label = read_from_csv(filename_queue)
  min_after_dequeue = 10000
  capacity = min_after_dequeue + 3 * batch_size
  example_batch, label_batch = tf.train.shuffle_batch(
      [example, label], batch_size=batch_size, capacity=capacity,
      min_after_dequeue=min_after_dequeue)
  return example_batch, label_batch

file_length = file_len(args.dataset) - 1
examples, labels = input_pipeline(file_length, 1)

with tf.Session() as sess:
  tf.initialize_all_variables().run()

  # start populating filename queue
  coord = tf.train.Coordinator()
  threads = tf.train.start_queue_runners(coord=coord)

  try:
    while not coord.should_stop():
      example_batch, label_batch = sess.run([examples, labels])
      print(example_batch)
  except tf.errors.OutOfRangeError:
    print('Done training, epoch reached')
  finally:
    coord.request_stop()

  coord.join(threads) 

解决方案

I think you are mixing up imperative and graph-construction parts here. The operation tf.train.shuffle_batch creates a new queue node, and a single node can be used to process the entire dataset. So I think you are hanging because you created a bunch of shuffle_batch queues in your for loop and didn't start queue runners for them.

Normal input pipeline usage looks like this:

  1. Add nodes like shuffle_batch to input pipeline
  2. (optional, to prevent unintentional graph modification) finalize graph

--- end of graph construction, beginning of imperative programming --

  1. tf.start_queue_runners
  2. while(True): session.run()

To be more scalable (to avoid Python GIL), you could generate all of your data using TensorFlow pipeline. However, if performance is not critical, you can hook up a numpy array to an input pipeline by using slice_input_producer. Here's an example with some Print nodes to see what's going on (messages in Print go to stdout when node is run)

tf.reset_default_graph()

num_examples = 5
num_features = 2
data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features))
print data

(data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False)
data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ")
data_batch = tf.batch([data_node_debug], batch_size=2)
data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ")

sess = tf.InteractiveSession()
sess.run(tf.initialize_all_variables())
tf.get_default_graph().finalize()
tf.start_queue_runners()

try:
  while True:
    print sess.run(data_batch_debug)
except tf.errors.OutOfRangeError as e:
  print "No more inputs."

You should see something like this

[[0 1]
 [2 3]
 [4 5]
 [6 7]
 [8 9]]
[[0 1]
 [2 3]]
[[4 5]
 [6 7]]
No more inputs.

The "8, 9" numbers didn't fill up the full batch, so they didn't get produced. Also tf.Print are printed to sys.stdout, so they show up in separately in Terminal for me.

PS: a minimal of connecting batch to a manually initialized queue is in github issue 2193

Also, for debugging purposes you might want to set timeout on your session so that your IPython notebook doesn't hang on empty queue dequeues. I use this helper function for my sessions

def create_session():
  config = tf.ConfigProto(log_device_placement=True)
  config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM
  config.operation_timeout_in_ms=60000   # terminate on long hangs
  # create interactive session to register a default session
  sess = tf.InteractiveSession("", config=config)
  return sess

Scalability Notes:

  1. tf.constant inlines copy of your data into the Graph. There's a fundamental limit of 2GB on size of Graph definition so that's an upper limit on size of data
  2. You could get around that limit by using v=tf.Variable and saving the data into there by running v.assign_op with a tf.placeholder on right-hand side and feeding numpy array to the placeholder (feed_dict)
  3. That still creates two copies of data, so to save memory you could make your own version of slice_input_producer which operates on numpy arrays, and uploads rows one at a time using feed_dict

这篇关于如何*实际*读取TensorFlow中的CSV数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆