在TensorFlow 2.0中,如何将TFRecord数据馈送给keras模型? [英] In TensorFlow 2.0, how to feed TFRecord data to keras model?

查看:208
本文介绍了在TensorFlow 2.0中,如何将TFRecord数据馈送给keras模型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图通过深度神经网络(DNN)解决输入数据具有32个特征和16个标签的分类问题.

I've tried to solve classification problem whose input data having 32 features and 16 labels by Deep Neural Network (DNN).

它们看起来像

# Input data
shape=(32,), dtype=float32, 
np.array([-0.9349509 ,  0.24052018, -0.29364416,  1.2375807 , -0.15996791,
        0.32468656,  0.43856472,  0.00573635, -0.48105922,  0.09342893,
        0.63206947,  0.44424117,  0.31256443,  0.09699771,  0.31004518,
        0.8685253 ,  0.74120486,  0.65404135, -0.4084895 ,  0.07065713,
        0.33964285, -0.20022233, -0.29489437,  0.40699714,  0.27028704,
        0.74895304, -0.4846958 ,  0.22371463,  0.3422047 , -0.24379562,
        0.38614622,  0.01282159])
# Label (Class)
shape=(16,), dtype=int64, np.array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

我想用超过1亿个数据(非常大的数据)训练我的NN,因此不可能将整个数据集加载到python数组中.
谷歌搜索后,我发现tf.TFRecord可以帮助我解决此容量问题.

I want to train my NN with over 100 million data (very large size), so it's not possible load whole dataset into python array.
After googling, I found tf.TFRecord helps me to get out this capacity problem.

我在TensorFlow官方网站中遵循教程来写入TFRecord文件,但是无法找到如何将TFReocrd加载到Keras模型中.

I followed the tutorial in the official TensorFlow site to write TFRecord file but could not find how to load the TFReocrd into the Keras Model.

我使用writer将数据集写入TFRecord文件中.

I used writer to write the dataset in TFRecord file.

writer = tf.io.TFRecordWriter(filenames)

for i range(number_of_sample):
    ...
    writer.write(serialize_example(input_data, label))
writer.close()

然后遵循serialize_example方法

def serialize_example(input_data, label):
    """
    Creates a tf.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.Example-compatible data type.
    feature = {
        'feature': tf.train.Feature(float_list=tf.train.FloatList(value=input_data)),
        'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=label))
    }

    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(
        features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

TFRecord读取步骤

写完TFRecord之后,我通过解析字符串来了解如何读取文件

TFRecord Reading Procedure

After writing the TFRecord, I find out how to read the files by parsing the string of it

dataset = tf.data.TFRecordDataset(filenames=[filenames])
parsed_dataset = dataset.map(_parse_function, num_parallel_calls=8)
final_dataset = parsed_dataset.shuffle(buffer_size=number_of_sample).batch(10)

print(parsed_dataset)
# <ParallelMapDataset shapes: {feature: (32,), label: (16,)}, 
types: {feature: tf.float32, label: tf.int64}>

for parsed_record in parsed_dataset.take(1):
    print(repr(parsed_record))
'''
{'feature': <tf.Tensor: id=10730, shape=(32,), dtype=float32, numpy=
array([ 0.3584828 ,  0.43238872,  0.84813404, -0.23866414, -0.3381694 ,
       -0.6825514 , -0.20499012, -0.60198826,  0.12879704, -0.6152373 ,
        0.21901904,  0.10998161,  0.04357208, -0.19996743, -0.24080099,
       -0.6282675 ,  0.57822317,  0.10296232, -0.25011575, -0.3454151 ,
        0.6235647 , -0.12194595, -0.18114032, -1.4484204 , -0.11394399,
       -0.20868362, -0.00653742,  0.677903  ,  0.09619896, -0.6428113 ,
       -0.59125495,  0.22995417], dtype=float32)>, 
'label': <tf.Tensor: id=10731, shape=(16,), dtype=int64, 
numpy=array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])>}
'''

然后遵循_parse_function方法

# Create a description of the features.
feature_description = {
    'feature': tf.io.FixedLenFeature([32, ], tf.float32),
    'label' : tf.io.FixedLenFeature([16, ], tf.int64)
}

def _parse_function(example_proto):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

进给TFRecord(训练)

到目前为止,似乎流程很顺畅,但是当我尝试将此数据集输入到keras Layer中时,它产生了错误.

Feed TFRecord (Training)

So far it seems like flowing smoothly, but when I tried to feed this dataset into keras Layer, it produced error.

模型定义和执行培训

inputs = keras.Input(shape=(32, ), name='feature')
x = layers.Dense(1024, activation='linear', name='dense_input')(inputs)
outputs = layers.Dense(expected_output, activation='softmax', name='label')(x)

model = keras.Model(inputs=inputs, outputs = outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(0.001), 
loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])

model.fit(final_dataset)

输出应为...

     1/Unknown - 0s 15ms/step
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-41-bb3547f32c4a> in <module>()
      8 loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
      9 
---> 10 model.fit(final_dataset )

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    726         max_queue_size=max_queue_size,
    727         workers=workers,
--> 728         use_multiprocessing=use_multiprocessing)
    729 
    730   def evaluate(self,

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
    322                 mode=ModeKeys.TRAIN,
    323                 training_context=training_context,
--> 324                 total_epochs=epochs)
    325             cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
    326 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
    121         step=step, mode=mode, size=current_batch_size) as batch_logs:
    122       try:
--> 123         batch_outs = execution_function(iterator)
    124       except (StopIteration, errors.OutOfRangeError):
    125         # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
     84     # `numpy` translates Tensors to values in Eager mode.
     85     return nest.map_structure(_non_none_constant_value,
---> 86                               distributed_function(input_fn))
     87 
     88   return execution_function

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
    455 
    456     tracing_count = self._get_tracing_count()
--> 457     result = self._call(*args, **kwds)
    458     if tracing_count == self._get_tracing_count():
    459       self._call_counter.called_without_tracing()

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
    501       # This is the first call of __call__, so we have to initialize.
    502       initializer_map = object_identity.ObjectIdentityDictionary()
--> 503       self._initialize(args, kwds, add_initializers_to=initializer_map)
    504     finally:
    505       # At this point we know that the initialization is complete (or less

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
    406     self._concrete_stateful_fn = (
    407         self._stateful_fn._get_concrete_function_internal_garbage_collected(  # pylint: disable=protected-access
--> 408             *args, **kwds))
    409 
    410     def invalid_creator_scope(*unused_args, **unused_kwds):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
   1846     if self.input_signature:
   1847       args, kwargs = None, None
-> 1848     graph_function, _, _ = self._maybe_define_function(args, kwargs)
   1849     return graph_function
   1850 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
   2148         graph_function = self._function_cache.primary.get(cache_key, None)
   2149         if graph_function is None:
-> 2150           graph_function = self._create_graph_function(args, kwargs)
   2151           self._function_cache.primary[cache_key] = graph_function
   2152         return graph_function, args, kwargs

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
   2039             arg_names=arg_names,
   2040             override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041             capture_by_value=self._capture_by_value),
   2042         self._function_attributes,
   2043         # Tell the ConcreteFunction to clean up its graph once it goes out of

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
    913                                           converted_func)
    914 
--> 915       func_outputs = python_func(*func_args, **func_kwargs)
    916 
    917       # invariant: `func_outputs` contains only Tensors, CompositeTensors,

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
    356         # __wrapped__ allows AutoGraph to swap in a converted function. We give
    357         # the function a weak reference to itself to avoid a reference cycle.
--> 358         return weak_wrapped_fn().__wrapped__(*args, **kwds)
    359     weak_wrapped_fn = weakref.ref(wrapped_fn)
    360 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
     71     strategy = distribution_strategy_context.get_strategy()
     72     outputs = strategy.experimental_run_v2(
---> 73         per_replica_function, args=(model, x, y, sample_weights))
     74     # Out of PerReplica outputs reduce or pick values to return.
     75     all_outputs = dist_utils.unwrap_output_dict(

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
    758       fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
    759                                 convert_by_default=False)
--> 760       return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    761 
    762   def reduce(self, reduce_op, value, axis):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
   1785       kwargs = {}
   1786     with self._container_strategy().scope():
-> 1787       return self._call_for_each_replica(fn, args, kwargs)
   1788 
   1789   def _call_for_each_replica(self, fn, args, kwargs):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
   2130         self._container_strategy(),
   2131         replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132       return fn(*args, **kwargs)
   2133 
   2134   def _reduce_to(self, reduce_op, value, destinations):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
    290   def wrapper(*args, **kwargs):
    291     with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292       return func(*args, **kwargs)
    293 
    294   if inspect.isfunction(func) or inspect.ismethod(func):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
    262       y,
    263       sample_weights=sample_weights,
--> 264       output_loss_metrics=model._output_loss_metrics)
    265 
    266   if reset_metrics:

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
    309           sample_weights=sample_weights,
    310           training=True,
--> 311           output_loss_metrics=output_loss_metrics))
    312   if not isinstance(outs, list):
    313     outs = [outs]

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
    250               output_loss_metrics=output_loss_metrics,
    251               sample_weights=sample_weights,
--> 252               training=training))
    253       if total_loss is None:
    254         raise ValueError('The model cannot be run '

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
    164 
    165         if hasattr(loss_fn, 'reduction'):
--> 166           per_sample_losses = loss_fn.call(targets[i], outs[i])
    167           weighted_losses = losses_utils.compute_weighted_loss(
    168               per_sample_losses,

IndexError: list index out of range

我不知道list是干什么的.

推荐答案

我正在TF 2.0中执行类似的操作,但有一些不同之处可能会解决您的问题.功能和标签中单独的parsed_record:

I'm doing something similar in TF 2.0 with a couple differences that may address your issues. Separate parsed_record in features and label:

    feature, label = parsed_record['feature'], parsed_record['label']

要继续从数据集中获取批次,请使用ds.repeat:

To continue getting batches from a dataset use ds.repeat:

    ds.repeat(ds.shuffle(buffer_size=number_of_sample).batch(batch_size))

我的完整输入管道如下:

My full input pipeline looks like:

def _parse_function_same_side(example_proto):
    """Extracts features and labels.
  
    Args:
        example_proto: tf.Example protocol    
      Returns:
    A `tuple` `(features, labels)`:
      features: A 2D tensor representing the features
      labels: A tensor with the corresponding labels.
    """
    feature_description = {
        "features": tf.io.FixedLenFeature(4, tf.int64), 
        "label": tf.io.FixedLenFeature(1, tf.int64)
                }
    
    parsed_features = tf.io.parse_single_example(example_proto, feature_description)
    
    features = parsed_features['features']
    
    labels = tf.one_hot(parsed_features['label'],depth=len(hero_vocab))
    return features, labels

def _input_fn(input_filenames, num_epochs=None, 
              shuffle=True, batch_size=50,compression_type=""):
   
    ds=tf.data.TFRecordDataset(input_filenames,compression_type=compression_type)
    ds=ds.map(_parse_function)
    
    #only shuffle if shuffle flag
    if shuffle:
        ds = ds.shuffle(10000)
    
    #take only dataset of length batch_size
    ds = ds.batch(batch_size)
    
    #make sure you can repeatedly take datasets from the TFRecord
    ds = ds.repeat()
    
    # Return the dataset.
    return ds

此后,我直接将数据集输入到我的模型中.

After this I just directly feed the dataset to my model.

这篇关于在TensorFlow 2.0中,如何将TFRecord数据馈送给keras模型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆