将大型训练和测试文件流式传输到 Tensorflow 的 DNNClassifier [英] Streaming large training and test files into Tensorflow's DNNClassifier

查看:20
本文介绍了将大型训练和测试文件流式传输到 Tensorflow 的 DNNClassifier的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个巨大的训练 CSV 文件 (709M) 和一个大的测试 CSV 文件 (125M),我想在使用高级 Tensorflow API 的上下文中将它们发送到 DNNClassifier.

I have a huge training CSV file (709M) and a large testing CSV file (125M) that I want to send into a DNNClassifier in the context of using the high-level Tensorflow API.

看来 fitevaluate 接受的 input_fn 参数必须在内存中保存所有特征和标签数据,但我目前想要在我的本地机器上运行它,因此如果我将这些文件读入内存然后处理它们,它会很快耗尽内存.

It appears that the input_fn param accepted by fit and evaluate must hold all feature and label data in memory, but I currently would like to run this on my local machine, and thus expect it to run out of memory rather quickly if I read these files into memory and then process them.

我浏览了流式读取数据的文档,但阅读了读取 CSV 的示例代码似乎适用于低级 Tensorflow API.

I skimmed the doc on streamed-reading of data, but the sample code for reading CSVs appears to be for the low-level Tensorflow API.

而且 - 如果你能原谅一些抱怨 - 对于将准备好的训练和测试数据文件发送到 Estimator 的微不足道的用例来说似乎过于复杂......尽管,也许在 Tensorflow 中训练和测试大量数据实际上需要这种复杂程度?

And - if you'll forgive a bit of whining - it seems overly-complex for the trivial use case of sending well-prepared files of training and test data into an Estimator ... although, perhaps that level of complexity is actually required for training and testing large volumes of data in Tensorflow?

无论如何,如果有可能的话,我真的很感谢将这种方法与高级 API 结合使用的示例,我开始怀疑这一点.

In any case, I'd really appreciate an example of using that approach with the high-level API, if it's even possible, which I'm beginning to doubt.

经过摸索,我确实找到了DNNClassifier#partial_fit,并将尝试将其用于训练.

After poking around, I did manage to find DNNClassifier#partial_fit, and will attempt to use it for training.

有关如何使用此方法的示例将为我节省一些时间,但希望我能在接下来的几个小时内找到正确的用法.

Examples of how to use this method would save me some time, though hopefully I'll stumble into the correct usage in the next few hours.

然而,似乎没有相应的DNNClassifier#partial_evaluate ...虽然我怀疑我可以将测试数据分解成更小的部分并运行DNNClassifier#evaluate 在每个批次上连续执行,这实际上可能是一个很好的方法,因为我可以将测试数据分成群组,从而获得每个群组的准确性.

However, there doesn't seem to be a corresponding DNNClassifier#partial_evaluate ... though I suspect that I could break-up the testing data into smaller pieces and run DNNClassifier#evaluate successively on each batch, which might actually be a great way to do it since I could segment the testing data into cohorts, and thereby obtain per-cohort accuracy.

==== 更新 ====

==== Update ====

简短版本:

  1. DomJack 的建议应该是公认的答案.

  1. DomJack's recommendation should be the accepted answer.

但是,我的 Mac 的 16GB RAM 足以将整个 709Mb 训练数据集保存在内存中而不会崩溃.因此,虽然我将在最终部署应用程序时使用 DataSets 功能,但我还没有将它用于本地开发工作.

However, my Mac's 16GB of RAM enough for it to hold the entire 709Mb training data set in memory without crashing. So, while I will use the DataSets feature when I eventually deploy the app, I'm not using it yet for local dev work.

更长的版本:

我开始使用 partial_fit API 如上所述,但每次使用时它都会发出警告.

I started by using the partial_fit API as described above, but upon every use it emitted a warning.

所以,我去看了这里,发现它的完整实现是这样的:

So, I went to look at the source for the method here, and discovered that its complete implementation looks like this:

logging.warning('The current implementation of partial_fit is not optimized'
                ' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
                batch_size=batch_size, monitors=monitors)

...这让我想起了搭便车指南中的这一幕:

... which reminds me of this scene from Hitchhiker's Guide:

Arthur Dent:如果我按下这个按钮会发生什么?

Arthur Dent: What happens if I press this button?

福特级长:我不会-

亚瑟·邓特:哦.

福特级长:发生了什么?

Ford Prefect: What happened?

Arthur Dent:一个标志亮起,上面写着请不要再按这个按钮".

Arthur Dent: A sign lit up, saying 'Please do not press this button again'.

也就是说:partial_fit 的存在似乎只是为了告诉您不要使用它.

Which is to say: partial_fit seems to exist for the sole purpose of telling you not to use it.

此外,在训练文件块上迭代使用 partial_fit 生成的模型比在整个训练文件上使用 fit 生成的模型小得多,这强烈表明只有最后一个 partial_fit 训练块实际上接受"了.

Furthermore, the model generated by using partial_fit iteratively on training file chunks was much smaller than the one generated by using fit on the whole training file, which strongly suggests that only the last partial_fit training chunk actually "took".

推荐答案

查看 tf.data.Dataset API.有多种方法可以创建数据集.我将概述四个 - 但您只需实现一个.

Check out the tf.data.Dataset API. There are a number of ways to create a dataset. I'll outline four - but you'll only have to implement one.

我假设您的 csv 文件的每一行都是 n_features 浮点值,后跟一个 int 值.

I assume each row of your csv files is n_features float values followed by a single int value.

最简单的入门方法是包装一个原生的python生成器.这可能会产生性能问题,但可能适合您的目的.

The easiest way to get started is to wrap a native python generator. This can have performance issues, but may be fine for your purposes.

def read_csv(filename):
    with open(filename, 'r') as f:
        for line in f.readlines():
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield features, label

def get_dataset():
    filename = 'my_train_dataset.csv'
    generator = lambda: read_csv(filename)
    return tf.data.Dataset.from_generator(
        generator, (tf.float32, tf.int32), ((n_features,), ()))

这种方法用途广泛,允许您独立于 TensorFlow 测试生成器函数 (read_csv).

This approach is highly versatile and allows you to test your generator function (read_csv) independently of TensorFlow.

支持 tensorflow 1.12+ 版本,tensorflow 数据集是我最喜欢的创建数据集的新方式.它会自动序列化您的数据、收集统计信息并通过 infobuilder 对象向您提供其他元数据.它还可以处理自动下载和提取,使协作变得简单.

Supporting tensorflow versions 1.12+, tensorflow datasets is my new favourite way of creating datasets. It automatically serializes your data, collects statistics and makes other meta-data available to you via info and builder objects. It can also handle automatic downloading and extracting making collaboration simple.

将 tensorflow_datasets 导入为 tfds

import tensorflow_datasets as tfds

class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
  VERSION = tfds.core.Version("0.0.1")

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        description=(
            "My dataset"),
        features=tfds.features.FeaturesDict({
            "features": tfds.features.Tensor(
              shape=(FEATURE_SIZE,), dtype=tf.float32),
            "label": tfds.features.ClassLabel(
                names=CLASS_NAMES),
            "index": tfds.features.Tensor(shape=(), dtype=tf.float32)
        }),
        supervised_keys=("features", "label"),
    )

  def _split_generators(self, dl_manager):
    paths = dict(
      train='/path/to/train.csv',
      test='/path/to/test.csv',
    )
    # better yet, if the csv files were originally downloaded, use
    # urls = dict(train=train_url, test=test_url)
    # paths = dl_manager.download(urls)
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            num_shards=10,
            gen_kwargs=dict(path=paths['train'])),
        tfds.core.SplitGenerator(
            name=tfds.Split.TEST,
            num_shards=2,
            gen_kwargs=dict(cvs_path=paths['test']))
    ]

  def _generate_examples(self, csv_path):
    with open(csv_path, 'r') as f:
        for i, line in enumerate(f.readlines()):
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield dict(features=features, label=label, index=i)

用法:

builder = MyCsvDatasetBuilder()
builder.download_and_prepare()  # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)

train_ds = datasets['train']
test_ds = datasets['test']

包装一个基于索引的python函数

上述的一个缺点是使用大小为 n 的洗牌缓冲区对结果数据集进行洗牌,需要加载 n 个示例.这将在您的管道中创建周期性暂停(大 n)或导致潜在的不良改组(小 n).

Wrap an index-based python function

One of the downsides of the above is shuffling the resulting dataset with a shuffle buffer of size n requires n examples to be loaded. This will either create periodic pauses in your pipeline (large n) or result in potentially poor shuffling (small n).

def get_record(i):
    # load the ith record using standard python, return numpy arrays
    return features, labels

def get_inputs(batch_size, is_training):

    def tf_map_fn(index):
        features, labels = tf.py_func(
            get_record, (index,), (tf.float32, tf.int32), stateful=False)
        features.set_shape((n_features,))
        labels.set_shape(())
        # do data augmentation here
        return features, labels

    epoch_size = get_epoch_size()
    dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
    if is_training:
        dataset = dataset.repeat().shuffle(epoch_size)
    dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
    dataset = dataset.batch(batch_size)
    # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, labels = dataset.make_one_shot_iterator().get_next()
    return features, labels

简而言之,我们创建了一个仅包含记录索引(或我们可以完全加载到内存中的任何小记录 ID)的数据集.然后我们对这个最小的数据集进行混洗/重复操作,然后通过 tf.data.Dataset.maptf.py_func<map 将索引映射到实际数据/代码>.有关用法,请参阅下面的使用估算器隔离测试 部分.请注意,这要求您的数据可以按行访问,因此您可能需要从 csv 转换为其他格式.

In short, we create a dataset just of the record indices (or any small record ID which we can load entirely into memory). We then do shuffling/repeating operations on this minimal dataset, then map the index to the actual data via tf.data.Dataset.map and tf.py_func. See the Using with Estimators and Testing in isolation sections below for usage. Note this requires your data to be accessible by row, so you may need to convert from csv to some other format.

您也可以使用 tf.data.TextLineDataset 直接读取 csv 文件.

You can also read the csv file directly using a tf.data.TextLineDataset.

def get_record_defaults():
  zf = tf.zeros(shape=(1,), dtype=tf.float32)
  zi = tf.ones(shape=(1,), dtype=tf.int32)
  return [zf]*n_features + [zi]

def parse_row(tf_string):
    data = tf.decode_csv(
        tf.expand_dims(tf_string, axis=0), get_record_defaults())
    features = data[:-1]
    features = tf.stack(features, axis=-1)
    label = data[-1]
    features = tf.squeeze(features, axis=0)
    label = tf.squeeze(label, axis=0)
    return features, label

def get_dataset():
    dataset = tf.data.TextLineDataset(['data.csv'])
    return dataset.map(parse_row, num_parallel_calls=8)

parse_row 函数有点复杂,因为 tf.decode_csv 需要一个批处理.如果在解析之前对数据集进行批处理,则可以稍微简化一些.

The parse_row function is a little convoluted since tf.decode_csv expects a batch. You can make it slightly simpler if you batch the dataset before parsing.

def parse_batch(tf_string):
    data = tf.decode_csv(tf_string, get_record_defaults())
    features = data[:-1]
    labels = data[-1]
    features = tf.stack(features, axis=-1)
    return features, labels

def get_batched_dataset(batch_size):
    dataset = tf.data.TextLineDataset(['data.csv'])
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_batch)
    return dataset

TFRecordDataset

或者,您可以将 csv 文件转换为 TFRecord 文件并使用 TFRecordDataset.这里有一个详尽的教程.

TFRecordDataset

Alternatively you can convert the csv files to TFRecord files and use a TFRecordDataset. There's a thorough tutorial here.

步骤 1:将 csv 数据转换为 TFRecords 数据.下面的示例代码(请参阅上面 from_generator 示例中的 read_csv).

Step 1: Convert the csv data to TFRecords data. Example code below (see read_csv from from_generator example above).

with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
    for features, labels in read_csv('my_train_dataset.csv'):
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())

这只需要运行一次.

第 2 步:编写一个数据集来解码这些记录文件.

Step 2: Write a dataset that decodes these record files.

def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']

def get_dataset():
    dataset = tf.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    return dataset

使用带有估计器的数据集

def get_inputs(batch_size, shuffle_size):
    dataset = get_dataset()  # one of the above implementations
    dataset = dataset.shuffle(shuffle_size)
    dataset = dataset.repeat()  # repeat indefinitely
    dataset = dataset.batch(batch_size)
            # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, label = dataset.make_one_shot_iterator().get_next()

estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)

单独测试数据集

我强烈建议您独立于估算器测试数据集.使用上面的get_inputs,应该就和

batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
    f_data, l_data = sess.run([features, labels])
print(f_data, l_data)  # or some better visualization function

性能

假设您使用 GPU 来运行您的网络,除非您的 csv 文件的每一行都很大并且您的网络很小,否则您可能不会注意到性能上的差异.这是因为 Estimator 实现强制在 CPU 上执行数据加载/预处理,而 prefetch 意味着可以在当前批次训练时在 CPU 上准备下一批在 GPU 上.唯一的例外是,如果您在每条记录有大量数据的数据集上有大量 shuffle 大小,这将需要一些时间来最初加载多个示例,然后再通过 GPU 运行任何内容.

Performance

Assuming your using a GPU to run your network, unless each row of your csv file is enormous and your network is tiny you probably won't notice a difference in performance. This is because the Estimator implementation forces data loading/preprocessing to be performed on the CPU, and prefetch means the next batch can be prepared on the CPU as the current batch is training on the GPU. The only exception to this is if you have a massive shuffle size on a dataset with a large amount of data per record, which will take some time to load in a number of examples initially before running anything through the GPU.

这篇关于将大型训练和测试文件流式传输到 Tensorflow 的 DNNClassifier的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆