在TensorFlow 2.0中,如何将TFRecord数据馈送给keras模型? [英] In TensorFlow 2.0, how to feed TFRecord data to keras model?
问题描述
我试图通过深度神经网络(DNN)解决输入数据具有32个特征和16个标签的分类问题.
I've tried to solve classification problem whose input data having 32 features and 16 labels by Deep Neural Network (DNN).
它们看起来像
# Input data
shape=(32,), dtype=float32,
np.array([-0.9349509 , 0.24052018, -0.29364416, 1.2375807 , -0.15996791,
0.32468656, 0.43856472, 0.00573635, -0.48105922, 0.09342893,
0.63206947, 0.44424117, 0.31256443, 0.09699771, 0.31004518,
0.8685253 , 0.74120486, 0.65404135, -0.4084895 , 0.07065713,
0.33964285, -0.20022233, -0.29489437, 0.40699714, 0.27028704,
0.74895304, -0.4846958 , 0.22371463, 0.3422047 , -0.24379562,
0.38614622, 0.01282159])
# Label (Class)
shape=(16,), dtype=int64, np.array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
我想用超过1亿个数据(非常大的数据)训练我的NN,因此不可能将整个数据集加载到python数组中.
谷歌搜索后,我发现tf.TFRecord
可以帮助我解决此容量问题.
I want to train my NN with over 100 million data (very large size), so it's not possible load whole dataset into python array.
After googling, I found tf.TFRecord
helps me to get out this capacity problem.
我在TensorFlow官方网站中遵循教程来写入TFRecord文件,但是无法找到如何将TFReocrd加载到Keras模型中.
I followed the tutorial in the official TensorFlow site to write TFRecord file but could not find how to load the TFReocrd into the Keras Model.
我使用writer将数据集写入TFRecord文件中.
I used writer to write the dataset in TFRecord file.
writer = tf.io.TFRecordWriter(filenames)
for i range(number_of_sample):
...
writer.write(serialize_example(input_data, label))
writer.close()
然后遵循serialize_example
方法
def serialize_example(input_data, label):
"""
Creates a tf.Example message ready to be written to a file.
"""
# Create a dictionary mapping the feature name to the tf.Example-compatible data type.
feature = {
'feature': tf.train.Feature(float_list=tf.train.FloatList(value=input_data)),
'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=label))
}
# Create a Features message using tf.train.Example.
example_proto = tf.train.Example(
features=tf.train.Features(feature=feature))
return example_proto.SerializeToString()
TFRecord读取步骤
写完TFRecord之后,我通过解析字符串来了解如何读取文件
TFRecord Reading Procedure
After writing the TFRecord, I find out how to read the files by parsing the string of it
dataset = tf.data.TFRecordDataset(filenames=[filenames])
parsed_dataset = dataset.map(_parse_function, num_parallel_calls=8)
final_dataset = parsed_dataset.shuffle(buffer_size=number_of_sample).batch(10)
print(parsed_dataset)
# <ParallelMapDataset shapes: {feature: (32,), label: (16,)},
types: {feature: tf.float32, label: tf.int64}>
for parsed_record in parsed_dataset.take(1):
print(repr(parsed_record))
'''
{'feature': <tf.Tensor: id=10730, shape=(32,), dtype=float32, numpy=
array([ 0.3584828 , 0.43238872, 0.84813404, -0.23866414, -0.3381694 ,
-0.6825514 , -0.20499012, -0.60198826, 0.12879704, -0.6152373 ,
0.21901904, 0.10998161, 0.04357208, -0.19996743, -0.24080099,
-0.6282675 , 0.57822317, 0.10296232, -0.25011575, -0.3454151 ,
0.6235647 , -0.12194595, -0.18114032, -1.4484204 , -0.11394399,
-0.20868362, -0.00653742, 0.677903 , 0.09619896, -0.6428113 ,
-0.59125495, 0.22995417], dtype=float32)>,
'label': <tf.Tensor: id=10731, shape=(16,), dtype=int64,
numpy=array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])>}
'''
然后遵循_parse_function
方法
# Create a description of the features.
feature_description = {
'feature': tf.io.FixedLenFeature([32, ], tf.float32),
'label' : tf.io.FixedLenFeature([16, ], tf.int64)
}
def _parse_function(example_proto):
# Parse the input `tf.Example` proto using the dictionary above.
return tf.io.parse_single_example(example_proto, feature_description)
进给TFRecord(训练)
到目前为止,似乎流程很顺畅,但是当我尝试将此数据集输入到keras Layer中时,它产生了错误.
Feed TFRecord (Training)
So far it seems like flowing smoothly, but when I tried to feed this dataset into keras Layer, it produced error.
模型定义和执行培训
inputs = keras.Input(shape=(32, ), name='feature')
x = layers.Dense(1024, activation='linear', name='dense_input')(inputs)
outputs = layers.Dense(expected_output, activation='softmax', name='label')(x)
model = keras.Model(inputs=inputs, outputs = outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(0.001),
loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
model.fit(final_dataset)
输出应为...
1/Unknown - 0s 15ms/step
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
<ipython-input-41-bb3547f32c4a> in <module>()
8 loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
9
---> 10 model.fit(final_dataset )
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
726 max_queue_size=max_queue_size,
727 workers=workers,
--> 728 use_multiprocessing=use_multiprocessing)
729
730 def evaluate(self,
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
322 mode=ModeKeys.TRAIN,
323 training_context=training_context,
--> 324 total_epochs=epochs)
325 cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
326
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
121 step=step, mode=mode, size=current_batch_size) as batch_logs:
122 try:
--> 123 batch_outs = execution_function(iterator)
124 except (StopIteration, errors.OutOfRangeError):
125 # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
84 # `numpy` translates Tensors to values in Eager mode.
85 return nest.map_structure(_non_none_constant_value,
---> 86 distributed_function(input_fn))
87
88 return execution_function
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
455
456 tracing_count = self._get_tracing_count()
--> 457 result = self._call(*args, **kwds)
458 if tracing_count == self._get_tracing_count():
459 self._call_counter.called_without_tracing()
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
501 # This is the first call of __call__, so we have to initialize.
502 initializer_map = object_identity.ObjectIdentityDictionary()
--> 503 self._initialize(args, kwds, add_initializers_to=initializer_map)
504 finally:
505 # At this point we know that the initialization is complete (or less
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
406 self._concrete_stateful_fn = (
407 self._stateful_fn._get_concrete_function_internal_garbage_collected( # pylint: disable=protected-access
--> 408 *args, **kwds))
409
410 def invalid_creator_scope(*unused_args, **unused_kwds):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
1846 if self.input_signature:
1847 args, kwargs = None, None
-> 1848 graph_function, _, _ = self._maybe_define_function(args, kwargs)
1849 return graph_function
1850
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
2148 graph_function = self._function_cache.primary.get(cache_key, None)
2149 if graph_function is None:
-> 2150 graph_function = self._create_graph_function(args, kwargs)
2151 self._function_cache.primary[cache_key] = graph_function
2152 return graph_function, args, kwargs
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
2039 arg_names=arg_names,
2040 override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041 capture_by_value=self._capture_by_value),
2042 self._function_attributes,
2043 # Tell the ConcreteFunction to clean up its graph once it goes out of
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
913 converted_func)
914
--> 915 func_outputs = python_func(*func_args, **func_kwargs)
916
917 # invariant: `func_outputs` contains only Tensors, CompositeTensors,
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
356 # __wrapped__ allows AutoGraph to swap in a converted function. We give
357 # the function a weak reference to itself to avoid a reference cycle.
--> 358 return weak_wrapped_fn().__wrapped__(*args, **kwds)
359 weak_wrapped_fn = weakref.ref(wrapped_fn)
360
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
71 strategy = distribution_strategy_context.get_strategy()
72 outputs = strategy.experimental_run_v2(
---> 73 per_replica_function, args=(model, x, y, sample_weights))
74 # Out of PerReplica outputs reduce or pick values to return.
75 all_outputs = dist_utils.unwrap_output_dict(
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
758 fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
759 convert_by_default=False)
--> 760 return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
761
762 def reduce(self, reduce_op, value, axis):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
1785 kwargs = {}
1786 with self._container_strategy().scope():
-> 1787 return self._call_for_each_replica(fn, args, kwargs)
1788
1789 def _call_for_each_replica(self, fn, args, kwargs):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
2130 self._container_strategy(),
2131 replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132 return fn(*args, **kwargs)
2133
2134 def _reduce_to(self, reduce_op, value, destinations):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
290 def wrapper(*args, **kwargs):
291 with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292 return func(*args, **kwargs)
293
294 if inspect.isfunction(func) or inspect.ismethod(func):
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
262 y,
263 sample_weights=sample_weights,
--> 264 output_loss_metrics=model._output_loss_metrics)
265
266 if reset_metrics:
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
309 sample_weights=sample_weights,
310 training=True,
--> 311 output_loss_metrics=output_loss_metrics))
312 if not isinstance(outs, list):
313 outs = [outs]
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
250 output_loss_metrics=output_loss_metrics,
251 sample_weights=sample_weights,
--> 252 training=training))
253 if total_loss is None:
254 raise ValueError('The model cannot be run '
/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
164
165 if hasattr(loss_fn, 'reduction'):
--> 166 per_sample_losses = loss_fn.call(targets[i], outs[i])
167 weighted_losses = losses_utils.compute_weighted_loss(
168 per_sample_losses,
IndexError: list index out of range
我不知道list
是干什么的.
推荐答案
我正在TF 2.0中执行类似的操作,但有一些不同之处可能会解决您的问题.功能和标签中单独的parsed_record:
I'm doing something similar in TF 2.0 with a couple differences that may address your issues. Separate parsed_record in features and label:
feature, label = parsed_record['feature'], parsed_record['label']
要继续从数据集中获取批次,请使用ds.repeat:
To continue getting batches from a dataset use ds.repeat:
ds.repeat(ds.shuffle(buffer_size=number_of_sample).batch(batch_size))
我的完整输入管道如下:
My full input pipeline looks like:
def _parse_function_same_side(example_proto):
"""Extracts features and labels.
Args:
example_proto: tf.Example protocol
Returns:
A `tuple` `(features, labels)`:
features: A 2D tensor representing the features
labels: A tensor with the corresponding labels.
"""
feature_description = {
"features": tf.io.FixedLenFeature(4, tf.int64),
"label": tf.io.FixedLenFeature(1, tf.int64)
}
parsed_features = tf.io.parse_single_example(example_proto, feature_description)
features = parsed_features['features']
labels = tf.one_hot(parsed_features['label'],depth=len(hero_vocab))
return features, labels
def _input_fn(input_filenames, num_epochs=None,
shuffle=True, batch_size=50,compression_type=""):
ds=tf.data.TFRecordDataset(input_filenames,compression_type=compression_type)
ds=ds.map(_parse_function)
#only shuffle if shuffle flag
if shuffle:
ds = ds.shuffle(10000)
#take only dataset of length batch_size
ds = ds.batch(batch_size)
#make sure you can repeatedly take datasets from the TFRecord
ds = ds.repeat()
# Return the dataset.
return ds
此后,我直接将数据集输入到我的模型中.
After this I just directly feed the dataset to my model.
这篇关于在TensorFlow 2.0中,如何将TFRecord数据馈送给keras模型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!