是否可以使用数据集/迭代器在单个 tensorflow 操作中遍历所有小批量? [英] Is it possible to loop through all minibatches in a single tensorflow op using dataset/iterators?

查看:42
本文介绍了是否可以使用数据集/迭代器在单个 tensorflow 操作中遍历所有小批量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 tf.data.dataset/iterator 机制并试图提高数据加载性能.我突然想到从 Python 卸载整个 minibatch 循环可能会有所帮助.我的数据足够小,存储在 CPU 或 GPU 上没有问题.

I'm working with tf.data.dataset/iterator mechanism and trying to improve data loading performance. It occurred to me that offloading the entire minibatch loop from Python might help. My data is small enough that storing on CPU or GPU is no problem.

那么,是否可以在调用 session.run 时在完整的 minibatched epoch 上循环优化器节点?

So, Is it possible to loop an optimizer node over a full minibatched epoch within a call to session.run?

iterator.get_next() 返回的张量每个 session.run 只增加一次,这似乎使得迭代小批量数据集是不可能的... 但如果可以的话,我的 CPU 只需要在每个 epoch 中接触 Python 线程一次.

The tensor returned by iterator.get_next() is only incremented once per session.run, which would seems to make it impossible to iterate through a dataset of minibatches... but if it could be done, my CPU would only have to touch the Python thread once per epoch.

更新:@muskrat 建议使用 tf.slice 可用于此目的.使用 tf.while_loop 对此进行示意性实现,请参阅我随后的非回答.然而,问题是这是否可以使用数据集/迭代器来完成......我仍然想知道.

UPDATE: @muskrat's suggestion to use tf.slice can be used for this purpose. See my subsequent non-answer with a schematic implementation of this using tf.while_loop. However, the question is whether this can be accomplished using dataset/iterators... and I'd still like to know.

推荐答案

这个答案"是 muskrat 的tf.slice 建议的一个实现,带有 tf.while_loop 的细节> 解决了(在 How to use tf.while_loop() in张量流https://www.tensorflow.org/api_docs/python/tf/while_loop).

This "answer" is an implementation of muskrat's tf.slice suggestion with the details of tf.while_loop worked out (with help from How to use tf.while_loop() in tensorflow and https://www.tensorflow.org/api_docs/python/tf/while_loop).

除非您的数据和模型小到会受到 Python I/O 的瓶颈(就像我一样!),否则这个解决方案可能是学术性的.

Unless your data and model are small enough that you're bottlenecked by Python I/O (like me!), this solution is probably academic.

优点:

  • 在不返回 Python 线程的情况下训练小批量.
  • 使用具有 GPU 实现的操作,这意味着可以将整个图形放置在 GPU 中.
  • 在我的小数据集上,大概是 Python I/O 的瓶颈,这个解决方案是我的数据集/迭代器(每个小批量接触 Python 一次)速度的两倍,是通过 feed_dict 传递小批量的速度的四倍.
  • Trains over minibatches without returning to the Python thread.
  • Uses only ops that have GPU implementations meaning that the entire graph can be placed in the GPU.
  • On my small dataset, which is presumably bottlenecked by Python I/O, this solution is twice the speed of my dataset/iteratior (which touches Python once per minibatch) and four times the speed of passing minibatches through feed_dict.

缺点:

  • tf.while_loop危险的.理解循环体内的操作何时被评估以及它们所依赖的操作何时被评估是具有挑战性的,尤其是(精简的)官方文档和有限的 Stack Overflow 覆盖范围.
  • tf.while_loop 缺少的文档是循环体外部的张量只计算一次,即使内部操作依赖它们.这意味着优化、模型和损失必须在循环中定义.如果您愿意,这会限制灵活性能够在训练时期之间调用验证损失操作.据推测,这可以通过 tf.cond 语句和通过 feed_dict 传入的适当标志来完成.但不如 tf.data 中的数据集/迭代器机制灵活或优雅.
  • 在 GPU 上似乎无法在每个 Epoch 添加改组操作.
  • tf.while_loop is treacherous. It's challenging to understand when ops inside the loop's body are evaluated and when those they depend on are evaluated, particularly the (thin) official documentation and limited Stack Overflow coverage.
  • The missing documentation of tf.while_loop is that tensors outside the body of the loop are only evaluated once, even if inner ops depend on them. This means that optimization, model, and loss have to be defined in the loop. This limits flexibility if you'd like to e.g. be able to call validation loss ops between training epochs. Presumably this could be accomplished with tf.cond statements and the appropriate flags passed in via feed_dict. But not nearly as flexible or elegant as the dataset/iterator mechanism in tf.data.
  • Adding shuffling operations at each Epoch doesn't seem available on GPU.

这是我的原理图代码(为简洁起见,我省略了变量和模型定义):

Here's my schematic code (I've ommitted the variable and model definition for brevity):

def buildModel(info, training_data, training_targets):

graph = tf.Graph()

with graph.as_default():
    # numBatches is passed in from Python once per Epoch.
    batch_size = tf.placeholder(tf.float32, name = 'batch_size')

    # Initializers for loop variables for tf.while_loop
    batchCounter = tf.Variable(0, dtype=tf.float32, trainable=False)
    lossList =  tf.Variable(tf.zeros([0,1]), trainable=False)

    # In a full example, I'd normalize my data here.  And possibly shuffle 
    tf_training_data     =  tf.constant(training_data,    dtype=tf.float32)
    tf_training_targets  =  tf.constant(training_targets, dtype=tf.float32)  

    # For brevity, I'll spare the definitions of my variables.  Because tf.Variables
    # are essentially treated as globals in the model and are manipulated directly (like with tf.apply)
    # they can reside outside runMinibatch, the body of tf.while_loop.

    # weights_1 =
    # biases_1  = 
    # etc.

    def moreMinibatches(batchCount, lossList):
        return (batchCount + 1) * batch_size <= len(training_data)

    def runMinibatch(batchCount, lossList):
        # These tensors and ops have to be defined inside runMinibatch, otherwise they're not updated as tf.wile_loop loops.  This means
        # slices, model definition, loss tensor, and training op.

        dat_batch  = tf.slice(tf_training_data,    [tf.cast(batchCounter * batch_size, tf.int32) , 0], [tf.cast(batch_size, tf.int32), -1])
        targ_batch = tf.slice(tf_training_targets, [tf.cast(batchCounter * batch_size, tf.int32) , 0], [tf.cast(batch_size, tf.int32), -1])

        # Here's where you'd define the model as a function of weights and biases above and dat_batch

        # model = <insert here>

        loss         = tf.reduce_mean(tf.squared_difference(model, targ_batch))
        optimizer    = tf.train.AdagradOptimizer() # for example

        train_op = optimizer.minimize(while_loss, name='optimizer')

        # control_dependences ensures that train_op is run before return
        # even though the return values don't explicitly depend on it.  
        with tf.control_dependencies([train_op]):
            return batchCount + 1,  tf.concat([lossList, [[while_loss]]],0)

    # So, the idea is that this trains a full epoch without returning to Python.
    trainMinibatches = tf.while_loop(moreMinibatches, runMinibatch, [minibatchCounter, lossList]
                                        shape_invariants=[batchCounter.get_shape(), tf.TensorShape(None)])

    return (graph, 
           {'trainMinibatches'     : trainAllMinibatches,
            'minibatchCounter'      : minibatchCounter,
            'norm_loss'             : norm_loss,
           } )

numEpochs     = 100 # e.g.
minibatchSize = 32  # 
# training_dataset = <data here>
# training_targets = <targets here>

graph, ops = buildModel(info, training_dataset, training_targets, 
minibatch_size)

with tf.Session(graph=graph, config=config) as session:
    tf.global_variables_initializer().run()

    for i in range(numEpochs):

    # This op will train on as all minibatches that fit in the full dataset. finalBatchCount with be the number of 
    # complete minibatches in the dataset.  lossList is a list of each step's minibatches.
    finalBatchCount, lossList = session.run(ops['trainAllMinibatches'], 
                                    feed_dict={'batch_size:0':minibatchSize})

    print('minibatch losses at Epoch', i, ': ', lossList)

这篇关于是否可以使用数据集/迭代器在单个 tensorflow 操作中遍历所有小批量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆