tf.train.shuffle_batch 和 `tf.train.batch 中发生了什么? [英] What's going on in tf.train.shuffle_batch and `tf.train.batch?

查看:35
本文介绍了tf.train.shuffle_batch 和 `tf.train.batch 中发生了什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用

绘制数据:红色绿色的颜色表示它们在

运行Using_Queues.py,它会产生10个batch,我在这张图中画出每个bach:(batch_size=100 and min_queue_examples=40)

如果 batch_size=1024min_queue_examples=40:

如果 batch_size=100min_queue_examples=4000:

如果 batch_size=1024min_queue_examples=4000:

即使 batch_size=1681min_queue_examples=4000:

该区域没有填充点.

为什么?

那么为什么要更改 min_queue_examples 使其更加随机?如何确定min_queue_examples的值?

tf.train.shuffle_batch 中发生了什么?

解决方案

tf.train.shuffle_batch()(因此tf.RandomShuffleQueue) 使用有点微妙.实现使用 tf.RandomShuffleQueue.dequeue_many(batch_size),其(简化)实现如下:

  • 虽然出列的元素数量小于 batch_size:
    • 等待队列至少包含 min_after_dequeue + 1 个元素.
    • 从队列中随机均匀地选择一个元素,将其从队列中移除,并将其添加到输出批次中.

另一件需要注意的事情是如何将元素添加到队列中,它使用运行 tf.RandomShuffleQueue.enqueue() 在同一个队列上:

  • 等到队列的当前大小小于其容量.
  • 将元素添加到队列中.

因此,队列的 capacitymin_after_dequeue 属性(加上入队的输入数据的分布)决定了 tf.train.shuffle_batch() 将采样.输入文件中的数据似乎是有序的,因此您完全依赖 tf.train.shuffle_batch() 函数来获得随机性.

依次进行可视化:

  1. 如果 capacitymin_after_dequeue 相对于数据集来说很小,则shuffling"将从类似于滑动窗口"的小群体中选择随机元素数据集.您可能会在出队批次中看到旧元素.

  2. 如果 batch_size 较大且 min_after_dequeue 相对于数据集较小,则洗牌"将再次从跨数据集的小滑动窗口"中进行选择数据集.

  3. 如果 min_after_dequeue 相对于 batch_size 和数据集的大小而言较大,您将看到(大约)来自每批数据的统一样本.

  4. 如果 min_after_dequeuebatch_size 相对于数据集的大小而言较大,您将看到(大约)来自每批数据的均匀样本.

  5. min_after_dequeue为4000,batch_size为1681的情况下,注意采样时队列中每个元素的预期副本数为4000/1681 = 2.38,因此更有可能对某些元素进行多次采样(并且不太可能对每个唯一元素仅采样一次).

I use Binary data to train a DNN.

But tf.train.shuffle_batch and tf.train.batchmake me confused.

This is my code and I will do some tests on it.

First Using_Queues_Lib.py:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf

NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 100
REAL32_BYTES=4


def read_dataset(filename_queue,data_length,label_length):
  class Record(object):
    pass
  result = Record()

  result_data  = data_length*REAL32_BYTES
  result_label = label_length*REAL32_BYTES
  record_bytes = result_data + result_label

  reader = tf.FixedLengthRecordReader(record_bytes=record_bytes)
  result.key, value = reader.read(filename_queue)

  record_bytes = tf.decode_raw(value, tf.float32)
  result.data  = tf.strided_slice(record_bytes, [0],[data_length])#record_bytes: tf.float list
  result.label = tf.strided_slice(record_bytes, [data_length],[data_length+label_length])
  return result


def _generate_data_and_label_batch(data, label, min_queue_examples,batch_size, shuffle):
  num_preprocess_threads = 16   #only speed code
  if shuffle:
    data_batch, label_batch = tf.train.shuffle_batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size,min_after_dequeue=min_queue_examples)
  else:
    data_batch, label_batch = tf.train.batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size)
  return data_batch, label_batch

def inputs(data_dir, batch_size,data_length,label_length):
  filenames = [os.path.join(data_dir, 'test_data_SE.dat')]
  for f in filenames:
    if not tf.gfile.Exists(f):
      raise ValueError('Failed to find file: ' + f)

  filename_queue = tf.train.string_input_producer(filenames)

  read_input = read_dataset(filename_queue,data_length,label_length)

  read_input.data.set_shape([data_length])   #important
  read_input.label.set_shape([label_length]) #important


  min_fraction_of_examples_in_queue = 0.4
  min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN *
                       min_fraction_of_examples_in_queue)
  print ('Filling queue with %d samples before starting to train. '
     'This will take a few minutes.' % min_queue_examples)

  return _generate_data_and_label_batch(read_input.data, read_input.label,
                                     min_queue_examples, batch_size,
                                     shuffle=True)

Second Using_Queues.py:

import Using_Queues_Lib
import tensorflow as tf
import numpy as np
import time


max_steps=10
batch_size=100
data_dir=r'.'
data_length=2
label_length=1

#-----------Save paras-----------
import struct
def WriteArrayFloat(file,data):
  fout=open(file,'wb')        
  fout.write(struct.pack('<'+str(data.flatten().size)+'f',
                                *data.flatten().tolist()))
  fout.close()
#-----------------------------

def add_layer(inputs, in_size, out_size, activation_function=None):
  Weights = tf.Variable(tf.truncated_normal([in_size, out_size]))
  biases = tf.Variable(tf.zeros([1, out_size]) + 0.1)
  Wx_plus_b = tf.matmul(inputs, Weights) + biases
  if activation_function is None:
    outputs = Wx_plus_b
  else:
    outputs = activation_function(Wx_plus_b)
  return outputs

data_train,labels_train=Using_Queues_Lib.inputs(data_dir=data_dir,
                      batch_size=batch_size,data_length=data_length,
                                          label_length=label_length)

xs=tf.placeholder(tf.float32,[None,data_length])
ys=tf.placeholder(tf.float32,[None,label_length])

l1 = add_layer(xs, data_length, 5, activation_function=tf.nn.sigmoid)
l2 = add_layer(l1, 5, 5, activation_function=tf.nn.sigmoid)
prediction = add_layer(l2, 5, label_length, activation_function=None)

loss = tf.reduce_mean(tf.square(ys - prediction))
train_step = tf.train.GradientDescentOptimizer(0.2).minimize(loss)

sess=tf.InteractiveSession()
tf.global_variables_initializer().run()

tf.train.start_queue_runners()

for i in range(max_steps):
  start_time=time.time()
  data_batch,label_batch=sess.run([data_train,labels_train])
  sess.run(train_step, feed_dict={xs: data_batch, ys: label_batch})
  duration=time.time()-start_time
  if i % 1 == 0:
    example_per_sec=batch_size/duration
    sec_pec_batch=float(duration)
    WriteArrayFloat(r'./data/'+str(i)+'.bin',
        np.concatenate((data_batch,label_batch),axis=1))
    format_str=('step %d,loss=%.8f(%.1f example/sec;%.3f sec/batch)')
    loss_value=sess.run(loss, feed_dict={xs: data_batch, ys: label_batch})
    print(format_str%(i,loss_value,example_per_sec,sec_pec_batch))

The data in here. And it generated by Mathematica.

data = Flatten@Table[{x, y, x*y}, {x, -1, 1, .05}, {y, -1, 1, .05}];
BinaryWrite[file, mydata, "Real32", ByteOrdering -> -1];
Close[file];

Length of data:1681

The data looks like this:

plot the data:The Red to Green color means the time when they occured in here

Run the Using_Queues.py,it will produce ten batch,and I draw each bach in this graph:(batch_size=100 and min_queue_examples=40)

If batch_size=1024 and min_queue_examples=40:

If batch_size=100 and min_queue_examples=4000:

If batch_size=1024 and min_queue_examples=4000:

And even If batch_size=1681 and min_queue_examples=4000:

The region are not filled with points.

Why?

So why change the min_queue_examples make more random? How to determine the value min_queue_examples?

What's going on in tf.train.shuffle_batch?

解决方案

The sampling function that tf.train.shuffle_batch() (and hence tf.RandomShuffleQueue) uses is a bit subtle. The implementation uses tf.RandomShuffleQueue.dequeue_many(batch_size), whose (simplified) implementation is as follows:

  • While the number of elements dequeued is less than batch_size:
    • Wait until the queue contains at least min_after_dequeue + 1 elements.
    • Select an element from the queue uniformly at random, remove it from the queue, and add it the output batch.

The other thing to note is how elements are added to the queue, which uses a background thread running tf.RandomShuffleQueue.enqueue() on the same queue:

  • Wait until the current size of the queue is less than its capacity.
  • Add the element to the queue.

As a result, the capacity and min_after_dequeue properties of the queue (plus the distribution of the input data being enqueued) determine the population from which tf.train.shuffle_batch() will sample. It appears that the data in your input files is ordered, so you are relying completely on the tf.train.shuffle_batch() function for randomness.

Taking your visualizations in turn:

  1. If capacity and min_after_dequeue are small relative to the dataset, the "shuffling" will select random elements from a small population resembling a "sliding window" across the dataset. With some small probability you will see old elements in the dequeued batch.

  2. If batch_size is large and min_after_dequeue is small relative to the dataset, the "shuffling" will again be selecting from a small "sliding window" across the dataset.

  3. If min_after_dequeue is large relative to batch_size and the size of the dataset, you will see (approximately) uniform samples from the data in each batch.

  4. If min_after_dequeue and batch_size are large relative to the size of the dataset, you will see (approximately) uniform samples from the data in each batch.

  5. In the case where min_after_dequeue is 4000, and batch_size is 1681, note that the expected number of copies of each element in the queue when it samples is 4000 / 1681 = 2.38, so it more is likely that some elements will be sampled more than once (and less likely that you will sample each unique element exactly once).

这篇关于tf.train.shuffle_batch 和 `tf.train.batch 中发生了什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆