解析csv时升级到tf.dataset无法正常工作 [英] Upgrade to tf.dataset not working properly when parsing csv

查看:93
本文介绍了解析csv时升级到tf.dataset无法正常工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个GCMLE实验,我正在尝试升级我的input_fn以使用新的tf.data功能.我已基于此

I have a GCMLE experiment and I am trying to upgrade my input_fn to use the new tf.data functionality. I have created the following input_fn based off of this sample

def input_fn(...):
    dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
    dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
                tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5) 
    if shuffle:
      dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()

    labels = features.pop(LABEL_COLUMN)

    return features, labels

我的parse_csv与我以前使用的相同,但当前无法使用.我可以解决一些问题,但是我不完全理解为什么为什么.这是我parse_csv()函数的开始

my parse_csv is the same as what I used previously, but it is not currently working. I can fix some of the issues, but I don't fully understand why I am having these issues. Here is the start of my parse_csv() function

def parse_csv(..):
    columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
    raw_features = dict(zip(FIELDNAMES, columns))

    words = tf.string_split(raw_features['sentences']) # splitting words
    vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
                default_value = 0)

....

  1. tf.string_split()立即停止工作并且错误为ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], [].-通过[raw_features['sentences']]raw_features['sentences']包装到张量中很容易解决,但是我不明白为什么这种dataset方法?在旧版本中怎么能正常工作呢?为了使形状与我的模型的其余部分匹配,我最终需要通过words = tf.squeeze(words, 0)最终删除此多余的尺寸,因为我将此不必要的"尺寸添加到了张量.

  1. Right away this tf.string_split() stops working and the error is ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], []. -- this is easily solved by packing raw_features['sentences'] into a tensor via [raw_features['sentences']] but I do not understand why this is needed with the this dataset approach? How come in the old version this worked fine? For the shapes to match up with the rest of my model, I end up needing to remove this extra dimension at the end via words = tf.squeeze(words, 0) because I add this "unecessary" dimension to the tensor.

无论出于何种原因,我还收到未初始化表tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized.的错误,但是,此代码与我的旧input_fn()完全一样(请参见下文),所以我不知道为什么现在将需要初始化表?我还没有为这部分找到解决方案.我在parse_csv函数中不能使用tf.contrib.lookup.index_table_from_file吗?

For whatever reason, I am also getting an error that the table is not initialized tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized. however, this code works completely fine with my old input_fn() (see below) so I don't know why I would now need to initialize the tables? I have not figured out a solution to this part. Is there anything that I am missing to be able to use tf.contrib.lookup.index_table_from_file within my parse_csv function?

作为参考,这是我的旧input_fn()仍然有效:

For reference, this is my old input_fn() that still does work:

def input_fn(...):
    filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames), 
                num_epochs=num_epochs, shuffle=shuffle, capacity=32)
    reader = tf.TextLineReader(skip_header_lines=skip_header_lines)

    _, rows = reader.read_up_to(filename_queue, num_records=batch_size)

    features = parse_csv(rows, hparams)


        if shuffle:
            features = tf.train.shuffle_batch(
                features,
                batch_size,
                min_after_dequeue=2 * batch_size + 1,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(), 
                enqueue_many=True,
                allow_smaller_final_batch=True
            )
        else:
            features = tf.train.batch(
                features,
                batch_size,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(),
                enqueue_many=True,
                allow_smaller_final_batch=True
            )

labels = features.pop(LABEL_COLUMN)

return features, labels

UPDATE TF 1.7

我正在使用TF 1.7(应该具有@mrry答案中提到的TF 1.6的所有功能)进行重新讨论,但是我仍然无法复制这种行为.对于我的旧版input_fn(),我每秒可以走13步.我正在使用的新功能如下:

I am revisiting this with TF 1.7 (which should have all of the TF 1.6 features mentioned in @mrry answer) but I'm still unable to replicate the behavior. For my old input_fn() I am able to gete around 13 steps/sec. The new function that I am using is as follows:

def input_fn(...):
    files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
    dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
            parse_csv_dataset(row, hparams = hparams), 
            batch_size = batch_size, 
            num_parallel_batches = multiprocessing.cpu_count())) 
    dataset = dataset.prefetch(1)
    if shuffle:
        dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    iterator = dataset.make_initializable_iterator()
    features = iterator.get_next()
    tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)

    labels = {key: features.pop(key) for key in LABEL_COLUMNS}

    return features, labels 

我相信我会遵循所有性能指南,例如1)使用预取2)使用map_and_batch与num_parallel_batches =核心3)使用parallel_interleave 4)在重复之前应用随机播放.我没有使用的唯一步骤是缓存建议,但希望它实际上仅对第一个以外的时代有所帮助,以及首先应用交错,预取和随机播放". -但是我发现,在map_and_batch之后进行预取和随机播放可以使速度提高10%.

I believe that I am following all of the performance guildines such as 1) use prefetch 2) use map_and_batch with num_parallel_batches = cores 3) use parallel_interleave 4) applying shuffle before the repeat. The only steps I am not using is the cache suggestion, but would expect that to really only help for epochs beyond the first one as well as "applying interleave, prefetch and shuffle first." -- however I found that having prefetch and shuffle after the map_and_batch was ~10% speedup.

缓冲区问题 我注意到的第一个性能问题是我的旧版input_fn()花了我大约13钟钟分钟才能完成20k步,但是即使buffer_size为10,000(这意味着我们正在等待,直到拥有10,000)分批处理)我仍在等待40多分钟,以使缓冲区变满.花这么长时间有意义吗?如果我知道我在GCS上分片的.csv已经被随机化了,那么将随机播放/缓冲区的大小设置得较小是可以接受的吗?我正在尝试从tf.train.shuffle_batch()复制行为-但是,看来在最坏的情况下,要达到10k步应该花同样的13分钟才能填满缓冲区?

BUFFER ISSUE The first performance issue that I am noticing is with my old input_fn() it took me about 13 wall clock minutes to get through 20k steps, and yet even with the buffer_size of 10,000 (which I take to mean we are waiting until we have 10,000 batches processed) I am still waiting more than 40 minutes for the buffer to get full . Does it make sense to take this long? If I know that my sharded .csv's on GCS are already randomized, is it acceptable to have this shuffle/buffer size smaller? I am trying to replicate the behavior from tf.train.shuffle_batch() -- however, it seems that at worst it should take the same 13 mins that it took to reach 10k steps in order to fill up the buffer?

STEPS/SEC

即使缓冲区已满,在与先前的input_fn()相同的模型上,全局步数/秒也将以约3步/秒的速度递增(通常低至2步/秒),而前一个input_fn()的速度为〜13步/秒

Even once the buffer has filled up, the global steps/sec tops out around 3 steps/sec (often as low as 2 steps/sec) on the same model with the previous input_fn() that is getting ~13 steps/sec.

松散的交错 我最终尝试用sloppy_interleave()替换parallel_interleave(),因为这是@mrry的另一个建议.当我切换到sloppy_interleave时,我得到了14个步骤/秒!我知道这意味着它不是确定性的,但实际上应该只意味着它从一个运行(或某个时期)到下一个运行(或时期)不是确定性的吗?还是对此有更大的启示?我应该担心旧的shuffle_batch()方法和sloppy_interleave之间的真正区别吗?这样会导致4-5倍的改善,这一事实是否暗示了先前的阻碍因素是什么?

SLOPPY INTERLEAVE I finall tried to replace parallel_interleave() with sloppy_interleave() as this is another suggestion from @mrry. When I switched to sloppy_interleave I got 14 steps/sec! I know this means that it is not deterministic, but that should really just mean it is not deterministic from one run (or epoch) to the next? Or are there larger implications for this? Should I be concerned about any real difference between the old shuffle_batch() method and sloppy_interleave? Does the fact that this results in a 4-5x improvement suggest what the previous blocking factor was?

推荐答案

在TF 1.4(当前是与GCMLE兼容的TF的最新版本)中,您将无法在查找表中使用make_one_shot_iterator()(请参见相关的帖子 ),您将需要使用Dataset.make_initializable_iterator(),然后使用默认的TABLES_INITIALIZER初始化iterator.initalizer(来自此

In TF 1.4 (which is currently the latest version of TF that works with GCMLE) you will not be able to use make_one_shot_iterator() with the lookup tables (see relevant post) you will need to use Dataset.make_initializable_iterator() and then initialize iterator.initalizer with your default TABLES_INITIALIZER (from this post). Here is what the input_fn() should look like:

def input_fn(...):
  dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)

  # Define `vocab_table` outside the map function and use it in `parse_csv()`.
  vocab_table = tf.contrib.lookup.index_table_from_file(
      vocabulary_file=hparams.vocab_file, default_value=0)

  dataset = dataset.interleave(
      lambda filename: (tf.data.TextLineDataset(filename)
                        .skip(1)
                        .map(lambda row: parse_csv(row, hparams),
                             num_parallel_calls=multiprocessing.cpu_count())),
      cycle_length=5) 

  if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)
  dataset = dataset.repeat(num_epochs)
  dataset = dataset.batch(batch_size)
  iterator = dataset.make_initializable_iterator()
  features = iterator.get_next()

  # add iterator.intializer to be handled by default table initializers
  tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer) 

  labels = features.pop(LABEL_COLUMN)

  return features, labels

这篇关于解析csv时升级到tf.dataset无法正常工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆