将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中 [英] Streaming large training and test files into Tensorflow's DNNClassifier
问题描述
我有一个庞大的培训CSV文件(709M)和一个大型测试CSV文件(125M),我想在使用高级Tensorflow API的情况下发送到DNNClassifier
中.
I have a huge training CSV file (709M) and a large testing CSV file (125M) that I want to send into a DNNClassifier
in the context of using the high-level Tensorflow API.
看来,fit
和evaluate
接受的input_fn
参数必须在内存中保存所有功能和标签数据,但是我目前想在本地计算机上运行它,因此希望它用完如果我将这些文件读入内存然后进行处理,则会很快地消耗内存.
It appears that the input_fn
param accepted by fit
and evaluate
must hold all feature and label data in memory, but I currently would like to run this on my local machine, and thus expect it to run out of memory rather quickly if I read these files into memory and then process them.
我在流式读取数据上浏览了文档,但是该示例代码用于读取CSV似乎是针对低级Tensorflow API的.
I skimmed the doc on streamed-reading of data, but the sample code for reading CSVs appears to be for the low-level Tensorflow API.
而且-如果您会稍稍抱怨-将琐碎的用例发送准备充分的培训和测试数据文件到Estimator
的琐碎用例似乎过于复杂...尽管,也许在Tensorflow中训练和测试大量数据实际上需要复杂性吗?
And - if you'll forgive a bit of whining - it seems overly-complex for the trivial use case of sending well-prepared files of training and test data into an Estimator
... although, perhaps that level of complexity is actually required for training and testing large volumes of data in Tensorflow?
无论如何,如果可能的话,我真的很欣赏将这种方法与高级API结合使用的示例,这使我开始怀疑.
In any case, I'd really appreciate an example of using that approach with the high-level API, if it's even possible, which I'm beginning to doubt.
闲逛之后,我确实找到了 DNNClassifier#partial_fit
,并将尝试将其用于培训.
After poking around, I did manage to find DNNClassifier#partial_fit
, and will attempt to use it for training.
有关如何使用此方法的示例将为我节省一些时间,尽管希望我在接下来的几个小时内会偶然发现正确的用法.
Examples of how to use this method would save me some time, though hopefully I'll stumble into the correct usage in the next few hours.
但是,似乎没有相应的DNNClassifier#partial_evaluate
...尽管我怀疑我可以将测试数据分解成较小的部分,然后在每个批次上依次运行DNNClassifier#evaluate
,实际上这可能是一个这样做的好方法,因为我可以将测试数据细分为同类,从而获得每个同类的准确性.
However, there doesn't seem to be a corresponding DNNClassifier#partial_evaluate
... though I suspect that I could break-up the testing data into smaller pieces and run DNNClassifier#evaluate
successively on each batch, which might actually be a great way to do it since I could segment the testing data into cohorts, and thereby obtain per-cohort accuracy.
====更新====
==== Update ====
简短版本:
-
DomJack的推荐应该是被接受的答案.
DomJack's recommendation should be the accepted answer.
但是,我的Mac具有16GB的RAM,足以将整个709Mb训练数据集保存在内存中而不会崩溃.因此,尽管我最终部署该应用程序时将使用DataSets功能,但尚未将其用于本地开发工作.
However, my Mac's 16GB of RAM enough for it to hold the entire 709Mb training data set in memory without crashing. So, while I will use the DataSets feature when I eventually deploy the app, I'm not using it yet for local dev work.
长版:
我从如上所述使用partial_fit
API开始,但是每次使用它都会发出警告.
I started by using the partial_fit
API as described above, but upon every use it emitted a warning.
So, I went to look at the source for the method here, and discovered that its complete implementation looks like this:
logging.warning('The current implementation of partial_fit is not optimized'
' for use in a loop. Consider using fit() instead.')
return self.fit(x=x, y=y, input_fn=input_fn, steps=steps,
batch_size=batch_size, monitors=monitors)
...这使我想起了《旅行者指南》中的这一场景:
... which reminds me of this scene from Hitchhiker's Guide:
亚瑟·邓特(Arthur Dent):如果按下此按钮会怎样?
Arthur Dent: What happens if I press this button?
福特省长:我不会-
亚瑟·邓特:哦.
福特县长:发生了什么事?
Ford Prefect: What happened?
亚瑟·邓特(Arthur Dent):一个标牌点亮,上面写着请不要再按此按钮".
Arthur Dent: A sign lit up, saying 'Please do not press this button again'.
这是说:partial_fit
似乎仅是为了告诉您不要使用它而存在.
Which is to say: partial_fit
seems to exist for the sole purpose of telling you not to use it.
此外,在训练文件块上使用partial_fit
迭代生成的模型比在整个训练文件上使用fit
生成的模型小得多,这强烈表明只有最后一个partial_fit
训练块实际上拿来".
Furthermore, the model generated by using partial_fit
iteratively on training file chunks was much smaller than the one generated by using fit
on the whole training file, which strongly suggests that only the last partial_fit
training chunk actually "took".
推荐答案
Check out the tf.data.Dataset
API. There are a number of ways to create a dataset. I'll outline four - but you'll only have to implement one.
我假设您的csv
文件的每一行都是n_features
浮点值,后跟一个int
值.
I assume each row of your csv
files is n_features
float values followed by a single int
value.
最简单的入门方法是包装本机python生成器.这可能会有性能问题,但可能对您有用.
The easiest way to get started is to wrap a native python generator. This can have performance issues, but may be fine for your purposes.
def read_csv(filename):
with open(filename, 'r') as f:
for line in f.readlines():
record = line.rstrip().split(',')
features = [float(n) for n in record[:-1]]
label = int(record[-1])
yield features, label
def get_dataset():
filename = 'my_train_dataset.csv'
generator = lambda: read_csv(filename)
return tf.data.Dataset.from_generator(
generator, (tf.float32, tf.int32), ((n_features,), ()))
这种方法用途广泛,可让您独立于TensorFlow测试生成器功能(read_csv
).
This approach is highly versatile and allows you to test your generator function (read_csv
) independently of TensorFlow.
支持tensorflow版本1.12 +,tensorflow数据集是我最喜欢的创建数据集的方法.它会自动序列化您的数据,收集统计信息,并通过info
和builder
对象使您可以使用其他元数据.它还可以处理自动下载和提取,从而简化了协作.
Supporting tensorflow versions 1.12+, tensorflow datasets is my new favourite way of creating datasets. It automatically serializes your data, collects statistics and makes other meta-data available to you via info
and builder
objects. It can also handle automatic downloading and extracting making collaboration simple.
将tensorflow_datasets导入为tfds
import tensorflow_datasets as tfds
class MyCsvDatasetBuilder(tfds.core.GeneratorBasedBuilder):
VERSION = tfds.core.Version("0.0.1")
def _info(self):
return tfds.core.DatasetInfo(
builder=self,
description=(
"My dataset"),
features=tfds.features.FeaturesDict({
"features": tfds.features.Tensor(
shape=(FEATURE_SIZE,), dtype=tf.float32),
"label": tfds.features.ClassLabel(
names=CLASS_NAMES),
"index": tfds.features.Tensor(shape=(), dtype=tf.float32)
}),
supervised_keys=("features", "label"),
)
def _split_generators(self, dl_manager):
paths = dict(
train='/path/to/train.csv',
test='/path/to/test.csv',
)
# better yet, if the csv files were originally downloaded, use
# urls = dict(train=train_url, test=test_url)
# paths = dl_manager.download(urls)
return [
tfds.core.SplitGenerator(
name=tfds.Split.TRAIN,
num_shards=10,
gen_kwargs=dict(path=paths['train'])),
tfds.core.SplitGenerator(
name=tfds.Split.TEST,
num_shards=2,
gen_kwargs=dict(cvs_path=paths['test']))
]
def _generate_examples(self, csv_path):
with open(csv_path, 'r') as f:
for i, line in enumerate(f.readlines()):
record = line.rstrip().split(',')
features = [float(n) for n in record[:-1]]
label = int(record[-1])
yield dict(features=features, label=label, index=i)
用法:
builder = MyCsvDatasetBuilder()
builder.download_and_prepare() # will only take time to run first time
# as_supervised makes output (features, label) - good for model.fit
datasets = builder.as_dataset(as_supervised=True)
train_ds = datasets['train']
test_ds = datasets['test']
包装基于索引的python函数
上述缺点之一是使用大小为n
的洗牌缓冲区对结果数据集进行洗牌,需要加载n
实例.这将在您的管道中创建周期性的停顿(较大的n
)或导致潜在的改组(较小的n
).
Wrap an index-based python function
One of the downsides of the above is shuffling the resulting dataset with a shuffle buffer of size n
requires n
examples to be loaded. This will either create periodic pauses in your pipeline (large n
) or result in potentially poor shuffling (small n
).
def get_record(i):
# load the ith record using standard python, return numpy arrays
return features, labels
def get_inputs(batch_size, is_training):
def tf_map_fn(index):
features, labels = tf.py_func(
get_record, (index,), (tf.float32, tf.int32), stateful=False)
features.set_shape((n_features,))
labels.set_shape(())
# do data augmentation here
return features, labels
epoch_size = get_epoch_size()
dataset = tf.data.Dataset.from_tensor_slices((tf.range(epoch_size,))
if is_training:
dataset = dataset.repeat().shuffle(epoch_size)
dataset = dataset.map(tf_map_fn, (tf.float32, tf.int32), num_parallel_calls=8)
dataset = dataset.batch(batch_size)
# prefetch data to CPU while GPU processes previous batch
dataset = dataset.prefetch(1)
# Also possible
# dataset = dataset.apply(
# tf.contrib.data.prefetch_to_device('/gpu:0'))
features, labels = dataset.make_one_shot_iterator().get_next()
return features, labels
简而言之,我们仅创建记录索引(或我们可以完全加载到内存中的任何小记录ID)的数据集.然后,我们对该最小数据集进行改组/重复操作,然后通过tf.data.Dataset.map
和tf.py_func
将map
索引转换为实际数据.有关用法,请参见下面的Using with Estimators
和Testing in isolation
部分.请注意,这要求您的数据可以按行访问,因此您可能需要将csv
转换为其他格式.
In short, we create a dataset just of the record indices (or any small record ID which we can load entirely into memory). We then do shuffling/repeating operations on this minimal dataset, then map
the index to the actual data via tf.data.Dataset.map
and tf.py_func
. See the Using with Estimators
and Testing in isolation
sections below for usage. Note this requires your data to be accessible by row, so you may need to convert from csv
to some other format.
您也可以使用tf.data.TextLineDataset
直接读取csv
文件.
You can also read the csv
file directly using a tf.data.TextLineDataset
.
def get_record_defaults():
zf = tf.zeros(shape=(1,), dtype=tf.float32)
zi = tf.ones(shape=(1,), dtype=tf.int32)
return [zf]*n_features + [zi]
def parse_row(tf_string):
data = tf.decode_csv(
tf.expand_dims(tf_string, axis=0), get_record_defaults())
features = data[:-1]
features = tf.stack(features, axis=-1)
label = data[-1]
features = tf.squeeze(features, axis=0)
label = tf.squeeze(label, axis=0)
return features, label
def get_dataset():
dataset = tf.data.TextLineDataset(['data.csv'])
return dataset.map(parse_row, num_parallel_calls=8)
parse_row
函数有些复杂,因为tf.decode_csv
需要批处理.如果在解析之前对数据集进行批处理,则可以使其稍微简单一些.
The parse_row
function is a little convoluted since tf.decode_csv
expects a batch. You can make it slightly simpler if you batch the dataset before parsing.
def parse_batch(tf_string):
data = tf.decode_csv(tf_string, get_record_defaults())
features = data[:-1]
labels = data[-1]
features = tf.stack(features, axis=-1)
return features, labels
def get_batched_dataset(batch_size):
dataset = tf.data.TextLineDataset(['data.csv'])
dataset = dataset.batch(batch_size)
dataset = dataset.map(parse_batch)
return dataset
TFRecordDataset
或者,您可以将csv
文件转换为TFRecord文件,并使用此处.
TFRecordDataset
Alternatively you can convert the csv
files to TFRecord files and use a TFRecordDataset. There's a thorough tutorial here.
步骤1:将csv
数据转换为TFRecords数据.下面的示例代码(请参见上面的from_generator
示例中的read_csv
).
Step 1: Convert the csv
data to TFRecords data. Example code below (see read_csv
from from_generator
example above).
with tf.python_io.TFRecordWriter("my_train_dataset.tfrecords") as writer:
for features, labels in read_csv('my_train_dataset.csv'):
example = tf.train.Example()
example.features.feature[
"features"].float_list.value.extend(features)
example.features.feature[
"label"].int64_list.value.append(label)
writer.write(example.SerializeToString())
这只需要运行一次.
第2步:编写可解码这些记录文件的数据集.
Step 2: Write a dataset that decodes these record files.
def parse_function(example_proto):
features = {
'features': tf.FixedLenFeature((n_features,), tf.float32),
'label': tf.FixedLenFeature((), tf.int64)
}
parsed_features = tf.parse_single_example(example_proto, features)
return parsed_features['features'], parsed_features['label']
def get_dataset():
dataset = tf.data.TFRecordDataset(['data.tfrecords'])
dataset = dataset.map(parse_function)
return dataset
使用带有估计量的数据集
def get_inputs(batch_size, shuffle_size):
dataset = get_dataset() # one of the above implementations
dataset = dataset.shuffle(shuffle_size)
dataset = dataset.repeat() # repeat indefinitely
dataset = dataset.batch(batch_size)
# prefetch data to CPU while GPU processes previous batch
dataset = dataset.prefetch(1)
# Also possible
# dataset = dataset.apply(
# tf.contrib.data.prefetch_to_device('/gpu:0'))
features, label = dataset.make_one_shot_iterator().get_next()
estimator.train(lambda: get_inputs(32, 1000), max_steps=1e7)
隔离测试数据集
我强烈建议您独立于估算器来测试数据集.使用上面的get_inputs
,它应该像
batch_size = 4
shuffle_size = 100
features, labels = get_inputs(batch_size, shuffle_size)
with tf.Session() as sess:
f_data, l_data = sess.run([features, labels])
print(f_data, l_data) # or some better visualization function
性能
假设您使用GPU来运行网络,除非csv
文件的每一行都很大且网络很小,否则您可能不会注意到性能上的差异.这是因为Estimator
实现强制在CPU上执行数据加载/预处理,并且prefetch
意味着可以在CPU上训练当前批处理时在CPU上准备下一个批处理.唯一的例外是,如果数据集上的洗牌大小很大,每条记录中有大量数据,那么在通过GPU运行任何内容之前,最初需要花费一些时间来加载大量示例.
Performance
Assuming your using a GPU to run your network, unless each row of your csv
file is enormous and your network is tiny you probably won't notice a difference in performance. This is because the Estimator
implementation forces data loading/preprocessing to be performed on the CPU, and prefetch
means the next batch can be prepared on the CPU as the current batch is training on the GPU. The only exception to this is if you have a massive shuffle size on a dataset with a large amount of data per record, which will take some time to load in a number of examples initially before running anything through the GPU.
这篇关于将大型培训和测试文件流式传输到Tensorflow的DNNClassifier中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!