将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中 [英] Streaming large training and test files into Tensorflow's DNNClassifier

查看:65
本文介绍了将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个庞大的培训CSV文件(709M)和一个大型测试CSV文件(125M),我想在使用高级Tensorflow API的情况下发送到DNNClassifier中.

I have a huge training CSV file (709M) and a large testing CSV file (125M) that I want to send into a DNNClassifier in the context of using the high-level Tensorflow API.

看来,fitevaluate接受的input_fn参数必须在内存中保存所有功能和标签数据,但是我目前想在本地计算机上运行它,因此希望它用完如果我将这些文件读入内存然后进行处理,则会很快地消耗内存.

It appears that the input_fn param accepted by fit and evaluate must hold all feature and label data in memory, but I currently would like to run this on my local machine, and thus expect it to run out of memory rather quickly if I read these files into memory and then process them.

我在流式读取数据上浏览了文档,但是该示例代码用于读取CSV似乎是针对低级Tensorflow API的.

I skimmed the doc on streamed-reading of data, but the sample code for reading CSVs appears to be for the low-level Tensorflow API.

而且-如果您会稍稍抱怨-将琐碎的用例发送准备充分的培训和测试数据文件到Estimator的琐碎用例似乎过于复杂...尽管,也许在Tensorflow中训练和测试大量数据实际上需要复杂性吗?

And - if you'll forgive a bit of whining - it seems overly-complex for the trivial use case of sending well-prepared files of training and test data into an Estimator ... although, perhaps that level of complexity is actually required for training and testing large volumes of data in Tensorflow?

无论如何,如果可能的话,我真的很欣赏将这种方法与高级API结合使用的示例,这使我开始怀疑.

In any case, I'd really appreciate an example of using that approach with the high-level API, if it's even possible, which I'm beginning to doubt.

闲逛之后,我确实找到了 DNNClassifier#partial_fit ,并将尝试将其用于培训.

After poking around, I did manage to find DNNClassifier#partial_fit, and will attempt to use it for training.

有关如何使用此方法的示例将为我节省一些时间,尽管希望我在接下来的几个小时内会偶然发现正确的用法.

Examples of how to use this method would save me some time, though hopefully I'll stumble into the correct usage in the next few hours.

但是,似乎没有相应的DNNClassifier#partial_evaluate ...尽管我怀疑我可以将测试数据分解成较小的部分,然后在每个批次上依次运行DNNClassifier#evaluate,实际上这可能是一个这样做的好方法,因为我可以将测试数据细分为同类,从而获得每个同类的准确性.

However, there doesn't seem to be a corresponding DNNClassifier#partial_evaluate ... though I suspect that I could break-up the testing data into smaller pieces and run DNNClassifier#evaluate successively on each batch, which might actually be a great way to do it since I could segment the testing data into cohorts, and thereby obtain per-cohort accuracy.

====更新====

==== Update ====

简短版本:

  1. DomJack的推荐应该是被接受的答案.

  1. DomJack's recommendation should be the accepted answer.

但是,我的Mac具有16GB的RAM,足以将整个709Mb训练数据集保存在内存中而不会崩溃.因此,尽管我最终部署该应用程序时将使用DataSets功能,但尚未将其用于本地开发工作.

However, my Mac's 16GB of RAM enough for it to hold the entire 709Mb training data set in memory without crashing. So, while I will use the DataSets feature when I eventually deploy the app, I'm not using it yet for local dev work.

长版:

我从如上所述使用partial_fit API开始,但是每次使用它都会发出警告.

I started by using the partial_fit API as described above, but upon every use it emitted a warning.

因此,我去看看方法

So, I went to look at the source for the method here, and discovered that its complete implementation looks like this:

logging.warning('The current implementation of partial_fit is not optimized'
                ' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
                batch_size=batch_size, monitors=monitors)

...这使我想起了《旅行者指南》中的这一场景:

... which reminds me of this scene from Hitchhiker's Guide:

亚瑟·邓特(Arthur Dent):如果按下此按钮会怎样?

Arthur Dent: What happens if I press this button?

福特省长:我不会-

亚瑟·邓特:哦.

福特县长:发生了什么事?

Ford Prefect: What happened?

亚瑟·邓特(Arthur Dent):一个标牌点亮,上面写着请不要再按此按钮".

Arthur Dent: A sign lit up, saying 'Please do not press this button again'.

这是说:partial_fit似乎仅是为了告诉您不要使用它而存在.

Which is to say: partial_fit seems to exist for the sole purpose of telling you not to use it.

此外,在训练文件块上使用partial_fit迭代生成的模型比在整个训练文件上使用fit生成的模型小得多,这强烈表明只有最后一个partial_fit训练块实际上拿来".

Furthermore, the model generated by using partial_fit iteratively on training file chunks was much smaller than the one generated by using fit on the whole training file, which strongly suggests that only the last partial_fit training chunk actually "took".

推荐答案

查看

Check out the tf.data.Dataset API. There are a number of ways to create a dataset. I'll outline four - but you'll only have to implement one.

我假设您的csv文件的每一行都是n_features浮点值,后跟一个int值.

I assume each row of your csv files is n_features float values followed by a single int value.

最简单的入门方法是包装本机python生成器.这可能会有性能问题,但可能对您有用.

The easiest way to get started is to wrap a native python generator. This can have performance issues, but may be fine for your purposes.

def read_csv(filename):
    with open(filename, 'r') as f:
        for line in f.readlines():
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield features, label

def get_dataset():
    filename = 'my_train_dataset.csv'
    generator = lambda: read_csv(filename)
    return tf.data.Dataset.from_generator(
        generator, (tf.float32, tf.int32), ((n_features,), ()))

这种方法用途广泛,可让您独立于TensorFlow测试生成器功能(read_csv).

This approach is highly versatile and allows you to test your generator function (read_csv) independently of TensorFlow.

支持tensorflow版本1.12 +,tensorflow数据集是我最喜欢的创建数据集的方法.它会自动序列化您的数据,收集统计信息,并通过infobuilder对象使您可以使用其他元数据.它还可以处理自动下载和提取,从而简化了协作.

Supporting tensorflow versions 1.12+, tensorflow datasets is my new favourite way of creating datasets. It automatically serializes your data, collects statistics and makes other meta-data available to you via info and builder objects. It can also handle automatic downloading and extracting making collaboration simple.

将tensorflow_datasets导入为tfds

import tensorflow_datasets as tfds

class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
  VERSION = tfds.core.Version("0.0.1")

  def _info(self):
    return tfds.core.DatasetInfo(
        builder=self,
        description=(
            "My dataset"),
        features=tfds.features.FeaturesDict({
            "features": tfds.features.Tensor(
              shape=(FEATURE_SIZE,), dtype=tf.float32),
            "label": tfds.features.ClassLabel(
                names=CLASS_NAMES),
            "index": tfds.features.Tensor(shape=(), dtype=tf.float32)
        }),
        supervised_keys=("features", "label"),
    )

  def _split_generators(self, dl_manager):
    paths = dict(
      train='/path/to/train.csv',
      test='/path/to/test.csv',
    )
    # better yet, if the csv files were originally downloaded, use
    # urls = dict(train=train_url, test=test_url)
    # paths = dl_manager.download(urls)
    return [
        tfds.core.SplitGenerator(
            name=tfds.Split.TRAIN,
            num_shards=10,
            gen_kwargs=dict(path=paths['train'])),
        tfds.core.SplitGenerator(
            name=tfds.Split.TEST,
            num_shards=2,
            gen_kwargs=dict(cvs_path=paths['test']))
    ]

  def _generate_examples(self, csv_path):
    with open(csv_path, 'r') as f:
        for i, line in enumerate(f.readlines()):
            record = line.rstrip().split(',')
            features = [float(n) for n in record[:-1]]
            label = int(record[-1])
            yield dict(features=features, label=label, index=i)

用法:

builder = MyCsvDatasetBuilder()
builder.download_and_prepare()  # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)

train_ds = datasets['train']
test_ds = datasets['test']

包装基于索引的python函数

上述缺点之一是使用大小为n的洗牌缓冲区对结果数据集进行洗牌,需要加载n实例.这将在您的管道中创建周期性的停顿(较大的n)或导致潜在的改组(较小的n).

Wrap an index-based python function

One of the downsides of the above is shuffling the resulting dataset with a shuffle buffer of size n requires n examples to be loaded. This will either create periodic pauses in your pipeline (large n) or result in potentially poor shuffling (small n).

def get_record(i):
    # load the ith record using standard python, return numpy arrays
    return features, labels

def get_inputs(batch_size, is_training):

    def tf_map_fn(index):
        features, labels = tf.py_func(
            get_record, (index,), (tf.float32, tf.int32), stateful=False)
        features.set_shape((n_features,))
        labels.set_shape(())
        # do data augmentation here
        return features, labels

    epoch_size = get_epoch_size()
    dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
    if is_training:
        dataset = dataset.repeat().shuffle(epoch_size)
    dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
    dataset = dataset.batch(batch_size)
    # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, labels = dataset.make_one_shot_iterator().get_next()
    return features, labels

简而言之,我们仅创建记录索引(或我们可以完全加载到内存中的任何小记录ID)的数据集.然后,我们对该最小数据集进行改组/重复操作,然后通过tf.data.Dataset.maptf.py_funcmap索引转换为实际数据.有关用法,请参见下面的Using with EstimatorsTesting in isolation部分.请注意,这要求您的数据可以按行访问,因此您可能需要将csv转换为其他格式.

In short, we create a dataset just of the record indices (or any small record ID which we can load entirely into memory). We then do shuffling/repeating operations on this minimal dataset, then map the index to the actual data via tf.data.Dataset.map and tf.py_func. See the Using with Estimators and Testing in isolation sections below for usage. Note this requires your data to be accessible by row, so you may need to convert from csv to some other format.

您也可以使用tf.data.TextLineDataset直接读取csv文件.

You can also read the csv file directly using a tf.data.TextLineDataset.

def get_record_defaults():
  zf = tf.zeros(shape=(1,), dtype=tf.float32)
  zi = tf.ones(shape=(1,), dtype=tf.int32)
  return [zf]*n_features + [zi]

def parse_row(tf_string):
    data = tf.decode_csv(
        tf.expand_dims(tf_string, axis=0), get_record_defaults())
    features = data[:-1]
    features = tf.stack(features, axis=-1)
    label = data[-1]
    features = tf.squeeze(features, axis=0)
    label = tf.squeeze(label, axis=0)
    return features, label

def get_dataset():
    dataset = tf.data.TextLineDataset(['data.csv'])
    return dataset.map(parse_row, num_parallel_calls=8)

parse_row函数有些复杂,因为tf.decode_csv需要批处理.如果在解析之前对数据集进行批处理,则可以使其稍微简单一些.

The parse_row function is a little convoluted since tf.decode_csv expects a batch. You can make it slightly simpler if you batch the dataset before parsing.

def parse_batch(tf_string):
    data = tf.decode_csv(tf_string, get_record_defaults())
    features = data[:-1]
    labels = data[-1]
    features = tf.stack(features, axis=-1)
    return features, labels

def get_batched_dataset(batch_size):
    dataset = tf.data.TextLineDataset(['data.csv'])
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(parse_batch)
    return dataset

TFRecordDataset

或者,您可以将csv文件转换为TFRecord文件,并使用此处.

TFRecordDataset

Alternatively you can convert the csv files to TFRecord files and use a TFRecordDataset. There's a thorough tutorial here.

步骤1:将csv数据转换为TFRecords数据.下面的示例代码(请参见上面的from_generator示例中的read_csv).

Step 1: Convert the csv data to TFRecords data. Example code below (see read_csv from from_generator example above).

with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
    for features, labels in read_csv('my_train_dataset.csv'):
        example = tf.train.Example()
        example.features.feature[
            "features"].float_list.value.extend(features)
        example.features.feature[
            "label"].int64_list.value.append(label)
        writer.write(example.SerializeToString())

这只需要运行一次.

第2步:编写可解码这些记录文件的数据集.

Step 2: Write a dataset that decodes these record files.

def parse_function(example_proto):
    features = {
        'features': tf.FixedLenFeature((n_features,), tf.float32),
        'label': tf.FixedLenFeature((), tf.int64)
    }
    parsed_features = tf.parse_single_example(example_proto, features)
    return parsed_features['features'], parsed_features['label']

def get_dataset():
    dataset = tf.data.TFRecordDataset(['data.tfrecords'])
    dataset = dataset.map(parse_function)
    return dataset

使用带有估计量的数据集

def get_inputs(batch_size, shuffle_size):
    dataset = get_dataset()  # one of the above implementations
    dataset = dataset.shuffle(shuffle_size)
    dataset = dataset.repeat()  # repeat indefinitely
    dataset = dataset.batch(batch_size)
            # prefetch data to CPU while GPU processes previous batch
    dataset = dataset.prefetch(1)
    # Also possible
    # dataset = dataset.apply(
    #     tf.contrib.data.prefetch_to_device('/gpu:0'))
    features, label = dataset.make_one_shot_iterator().get_next()

estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)

隔离测试数据集

我强烈建议您独立于估算器来测试数据集.使用上面的get_inputs,它应该像

batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
    f_data, l_data = sess.run([features, labels])
print(f_data, l_data)  # or some better visualization function

性能

假设您使用GPU来运行网络,除非csv文件的每一行都很大且网络很小,否则您可能不会注意到性能上的差异.这是因为Estimator实现强制在CPU上执行数据加载/预处理,并且prefetch意味着可以在CPU上训练当前批处理时在CPU上准备下一个批处理.唯一的例外是,如果数据集上的洗牌大小很大,每条记录中有大量数据,那么在通过GPU运行任何内容之前,最初需要花费一些时间来加载大量示例.

Performance

Assuming your using a GPU to run your network, unless each row of your csv file is enormous and your network is tiny you probably won't notice a difference in performance. This is because the Estimator implementation forces data loading/preprocessing to be performed on the CPU, and prefetch means the next batch can be prepared on the CPU as the current batch is training on the GPU. The only exception to this is if you have a massive shuffle size on a dataset with a large amount of data per record, which will take some time to load in a number of examples initially before running anything through the GPU.

这篇关于将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆