TensorFlow:如何在训练过程中多次评估验证数据队列? [英] TensorFlow: How can I evaluate a validation data queue multiple times during training?

查看:143
本文介绍了TensorFlow:如何在训练过程中多次评估验证数据队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

tl; dr

如何在每K次训练迭代后使用单独的队列来训练和验证数据来评估验证集,而不必在多个过程中使用单独的tf.Sessions?鉴于我的特殊问题,似乎没有一种干净的方法可以实现此目的,并且我当前的解决方法(我认为可以解决)给我带来了不确定的行为.救命!

整个故事

我想每K个训练迭代评估一个验证集,但我无法弄清楚如何在TensorFlow中正确实现这一点.这应该是最常见的操作之一,但是感觉TensorFlow的API/体系结构在这里不利于我,或者至少使事情变得不必要地困难.

我的假设是:

  • [A1]此处描述的用于训练/验证的多过程模型 https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines 不适用于我的问题,因为我不得不假设没有足够的GPU内存来两次加载变量.
  • [A2]我想每K次训练迭代就对验证集进行评估.
  • [A3]训练和验证数据都不能简单地从磁盘读取,而是动态生成的.这使得不可能可靠地预先预先计算验证集的大小.
  • [A4]验证集太大,无法预先计算并存储到磁盘上.
  • [A5]有效的验证集大小不一定是批处理大小的倍数.

训练输入管道设置如下:

  • tf.train.slice_input_producer()会生成一个(随机的)文件名列表,每个文件名都引用原始输入数据.
  • 自定义数据生成功能从每个原始输入数据块中生成可变数量的训练样本/标签.
  • 生成的训练样本/标签在被馈送到网络之前,先通过tf.train.shuffle_batch()排队.

由于[A3],[A4],[A5],验证输入管道以几乎相同的方式建立,除了最终输入队列是通过tf.train.batch()生成的,因为不希望进行混洗.由于上述假设,基于feed_dict的方法也不可行,而且似乎与使用更高级别的功能(例如tf.train.batch)不兼容.

但是,使用两组不同的队列进行训练和验证的简单实现不起作用.据我了解,我有两种选择:

  • [B1]将验证tf.train.slice_input_producernum_epochs参数设置为None.

    在这种情况下,验证集会无休止地循环遍历,但是我需要提前知道验证集的大小,以明确限制每次通过验证集评估的批次数量.此外,如果验证集的大小不能被批量大小整除,那么我将在最后一个批次中再增加一点.因为这样会每次更改验证数据的评估顺序,所以这是不可接受的.

  • [B2]将验证tf.train.slice_input_producernum_epochs参数设置为1,并将tf.train.batch函数的allow_smaller_final_batch参数设置为True.

    在这种情况下,验证集将精确循环一次,然后永久关闭各个队列.默认情况下,这将使评估验证集两次或更多次变得不可能.由于我不知道在TensorFlow中重新打开队列的好方法,因此我需要解决此限制.

由于选项[B1]的较大限制,我选择解决选项[B2]的问题.概述我当前方法的(伪)代码如下:

培训循环应相当规范.每进行K次迭代,就会调用一个用于评估验证集的函数. 请注意,我只启动名称以"train_"开头的队列.这些是为收集生成的训练数据而设置的队列.为此,我创建了两个辅助函数get_queues_by_namestart_queue_runners.

def train_loop(train_ops, vali_ops, ...):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("train")
        threads = start_queue_runners(sess, coord, queues)

        try:
            for step in range(start_iteration, num_train_iterations):
                # Runs the session on validation set
                if step % K == 0:
                    validation_results = run_validation(vali_ops, snapshot_file)

                # TRAINING:
                # ...

        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

辅助函数如下:

def get_queues_by_name(name):
    """Retrieves all queues that contain the string given by 'name'"""
    all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
    return [q for q in all_queues if name in q.name]


def start_queue_runners(session, coordinator, queues):
    """Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
    with session.graph.as_default():
        threads = []
        for queue in queues:
            log("Queue", "Starting queue '%s'" % queue.name, level=2)
            threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
    return threads

run_validation函数中,针对闭合队列问题,我选择的解决方法是创建一个新的tf.Session.我也只启动与收集验证集数据的队列关联的线程.

def run_validation(ops, snapshot_file):  # Called inside train_loop()
    results = None
    loader = tf.train.Saver()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("eval")
        threads = start_queue_runners(sess, coord, queues)

        # Performs the inference in batches
        try:
            # Evaluate validation set:
            results = eval_in_batches(ops, sess)
        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

    return results

我不知道在这里创建一个新的tf.Session是否是个好主意,但这似乎是完成重新启动验证队列的唯一方法.理想情况下,我也不想重新加载模型快照,因为从概念上看这是不必要的.

此代码的问题是,我在运行期间看到不稳定/未定义的行为,例如在验证集评估期间,网络中出现了NaN或Inf.这似乎主要发生在在仍然填充训练集队列的同时填充验证集队列的情况下(因为训练队列在验证集评估期间处于打开状态).例如,如果我在迭代0时评估验证集(当仍然需要填充两个队列时),就会经常发生这种情况.尽管训练/验证队列运行在不同的会话中,但看起来似乎共享一些全局状态.

有人可以解释为什么会发生这种情况,以及在考虑上述假设[A1]-[A5]的情况下如何更干净地解决此问题吗?

解决方案

我目前正面临类似的问题.到目前为止,我完全避免了任何队列,只是通过feed_dict输入数据,但是我显然不使用队列和并行性而失去了一些性能(尽管我仍然对当前的速度感到满意,因为我在Theano较早).现在,我想重新设计它并使用队列,但偶然发现了这个问题.有相关问题.

我目前正在考虑采用这种方式:

  • 在培训中,我想使用RandomShuffleQueue使其变得更加复杂.我想我只会忽略这个问题,一旦将张量排入队列的读取器线程完成了,我将停止训练,因此我为该时期松开了剩余的最多capacity项,并将其用于下一个时代.也许要确定性地检查我仍然从队列中读取的训练线程,直到只剩下min_after_dequeue个项目.

  • 在评估中,我想使用相同的图形和相同的会话.我可以使用tf.cond从另一个单独的队列而不是RandomShuffleQueue进行读取.或者,我可以在评估中使用feed_dict.如果要使用单独的队列,请使用FIFOQueue并仔细跟踪我执行了正确的步骤.我还可以引入另一个虚拟张量,将其排入队列,这会给我一个end_of_epoch标志,所以我在eval线程中知道何时停止.


在TensorFlow 1.2中,将存在tf.contrib.data界面(问题注释文档概述 API文档),它提供了tf.contrib.data.Dataset API,该API还支持类似于tf.RandomShuffleQueue的改组以及在多个时期进行批处理和循环.另外,您可以通过在数据上创建迭代器来访问数据,并可以重置迭代器.一些与StackOverflow相关的问题位于此处此处. /p>

tl;dr

How can I evaluate a validation set after every K training iterations, using separate queues for training and validation data, without resorting to separate tf.Sessions in multiple processes? There doesn't seem to be a clean way to achieve this, given my particular problem, and my current workaround (which I thought would work) gives me undefined behavior. Help!

The whole story

I want to evaluate a validation set every K training iterations, and I cannot figure out how to implement this properly in TensorFlow. This should be one of the most common operations, yet it feels that TensorFlow's API/architecture is working against me here or is at least making things unnecessarily difficult.

My assumptions are:

  • [A1] The multi-process model for training/validation as described here https://www.tensorflow.org/how_tos/reading_data/#multiple_input_pipelines is not applicable to my problem, as I have to assume there is not enough GPU memory available to load the variables twice.
  • [A2] I want to evaluate on the validation set every K training iterations.
  • [A3] Both training and validation data cannot be simply read from disk, but are generated on-the-fly. This makes it impossible to reliably pre-compute the size of the validation set in advance.
  • [A4] The validation set is too large to pre-compute and store onto disk.
  • [A5] The effective validation set size is not necessarily a multiple of the batch size.

The training input pipeline is set up as follows:

  • A tf.train.slice_input_producer() generates a (shuffled) list of filenames, each referring to raw input data.
  • A custom data generation function generates a variable number of training exemplars/labels from each chunk of raw input data.
  • The generated training exemplars/labels are queued via tf.train.shuffle_batch() before being fed into the network.

Due to [A3], [A4], [A5], the validation input pipeline is set up in an almost identical way, except that the final input queue is generated via tf.train.batch(), since shuffling is not desirable. Due to the above assumptions, a feed_dict based approach is also infeasible, and also seemingly incompatible with using a higher level function such as tf.train.batch.

However, a straightforward implementation using two different sets of queues for training and validation does not work. As far as I understand, I have two options:

  • [B1] Set the num_epochs argument of the validation tf.train.slice_input_producer to None.

    In this case, the validation set is cycled through endlessly, but I would need to know the size of the validation set in advance to explicitly limit the number of batches to evaluate per run through the validation set. Furthermore, if the validation set size is not divisible by the batch size, I will always pull a bit more in the last batch. As this would shift the order of evaluation of the validation data each time, this is not acceptable.

  • [B2] Set the num_epochs argument of the validation tf.train.slice_input_producer to 1, and additionally set the allow_smaller_final_batch argument of the tf.train.batch function to True.

    In this case, the validation set is cycled through exactly once, after which the respective queue is closed forever. By default, this will make evaluating the validation set two or more times impossible. Since I do not know of a good way to reopen a queue in TensorFlow, I need to work around this limitation.

Due to the greater limitations of option [B1], I chose to work around the issues of option [B2] instead. The (pseudo-)code outlining my current approach is as follows:

The training loop should be fairly canonical. Every K iterations, a function to evaluate the validation set is called. Note that I only start the queues that have a name starting with "train_"; these is the queue set up for collecting generated training data. In order to do this, I created two helper functions, get_queues_by_name and start_queue_runners.

def train_loop(train_ops, vali_ops, ...):
    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_all_variables(), tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("train")
        threads = start_queue_runners(sess, coord, queues)

        try:
            for step in range(start_iteration, num_train_iterations):
                # Runs the session on validation set
                if step % K == 0:
                    validation_results = run_validation(vali_ops, snapshot_file)

                # TRAINING:
                # ...

        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

The helper functions look like this:

def get_queues_by_name(name):
    """Retrieves all queues that contain the string given by 'name'"""
    all_queues = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
    return [q for q in all_queues if name in q.name]


def start_queue_runners(session, coordinator, queues):
    """Similar to tf.train.start_queue_runners but now accepts a list of queues instead of a graph collection"""
    with session.graph.as_default():
        threads = []
        for queue in queues:
            log("Queue", "Starting queue '%s'" % queue.name, level=2)
            threads.extend(queue.create_threads(session, coordinator, daemon=True, start=True))
    return threads

In the run_validation function, my chosen workaround against the issue of a closed queue is to create a new tf.Session. I also only start the threads associated with the queue collecting validation set data.

def run_validation(ops, snapshot_file):  # Called inside train_loop()
    results = None
    loader = tf.train.Saver()

    with tf.Session() as sess:
        coord = tf.train.Coordinator()
        sess.run([tf.initialize_local_variables()])
        load_latest_snapshot(sess, loader, snapshot_file)

        # Launch the queue runners
        queues = get_queues_by_name("eval")
        threads = start_queue_runners(sess, coord, queues)

        # Performs the inference in batches
        try:
            # Evaluate validation set:
            results = eval_in_batches(ops, sess)
        except Exception as e:
            coord.request_stop(e)
        finally:
            coord.request_stop()
            coord.join(threads)

    return results

I do not know whether creating a new tf.Session here is a good idea, but it seems like the only way to accomplish restarting the validation queue. Ideally, I also wouldn't want to re-load the model snapshot, as this seems conceptually unnecessary.

The issue with this code is that I see erratic/undefined behavior during running, such as NaN's or Inf's appearing inside the network during validation set evaluation. This seems to occur predominantly when the validation set queue is being filled at the same time as the training set queue is still being filled (since the training queue is open during validation set evaluation). For example, this very often happens if I evaluate the validation set at iteration 0 (when both queues still need to be filled). It almost seems as if the training/validation queues share some global state, although they are running in a different session.

Can someone explain why this is happening, and how I can solve this more cleanly while taking my above assumptions [A1]-[A5] into account?

解决方案

I'm currently facing a similar problem. So far I avoided any queues at all and just feeding in the data via the feed_dict but I'm obviously loosing some performance by not using queues and parallelism (although I'm still happy with the current speed as I did the same in Theano earlier). Now I want to redesign this and use queues and stumbled upon this problem. There are this, this, this related issues.

I'm currently thinking about doing it this way:

  • In training, I want to use a RandomShuffleQueue which makes it even more complicated. I think I will just ignore the problem and once the reader thread which enqueues tensors into the queue finishes, I will let the training stop, so I loose the remaining up-to capacity items for this epoch and just use it for the next epoch. Maybe to make it deterministic I check in the train-thread that I still read from the queue until there are only min_after_dequeue items left.

  • In evaluation, I want to use the same graph and the same session. I can use tf.cond to read from another separate queue instead of the RandomShuffleQueue. Or I could use feed_dict in evaluation. If I would use a separate queue, I would use a FIFOQueue and carefully track that I do the right amount of steps. I could also introduce another dummy tensor which I enqueue into the queue which gives me a end_of_epoch flag or so, so then I know in the eval-thread when to stop.


In TensorFlow 1.2, there will be the tf.contrib.data interface (issue comment, documentation overview, API documentation), which provides the tf.contrib.data.Dataset API which also supports shuffling similar as tf.RandomShuffleQueue and batching and looping over multiple epochs. Also, you access the data by creating an iterator over it and you can reset the iterator. Some related StackOverflow questions are here and here.

这篇关于TensorFlow:如何在训练过程中多次评估验证数据队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆