使用Tensorflow 2.0在MNIST上的自定义神经网络实现? [英] Custom Neural Network Implementation on MNIST using Tensorflow 2.0?

查看:636
本文介绍了使用Tensorflow 2.0在MNIST上的自定义神经网络实现?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用 *TensorFlow 2.0 beta* ,但我不确定这里出了什么问题,但是我的培训损失准确性似乎停留在 1.5 附近85 .但是,如果我使用 Keras 进行构建,则仅 8-10 个时期,我的培训损失和准确性就非常低,超过了 95%.

我相信也许我没有更新自己的体重或其他信息?那么,我需要将在反向传播函数中计算出的新权重分配给它们各自的权重/偏差变量吗?

如果有人可以帮助我解决我以及下面我提到的其他几个问题,我真的很感激.

其他几个问题:

1)如何在此自定义实现中添加 Dropout Batch Normalization 图层? (使它既适用于培训时间又适用于考试时间)

2)如何在此代码中使用回调?即(使用EarlyStopping和ModelCheckpoint回调)

3)下面的代码中还有什么我可以进一步优化的代码,例如可以使用 tensorflow 2.x @ tf.function装饰器等.)

4)我还需要提取获得的最终权重,用于绘制和检查其分布.调查诸如梯度消失或爆炸之类的问题. (例如:也许是Tensorboard)

5)我还需要帮助以更通用的方式编写此代码,以便我可以轻松地基于此代码轻松实现其他网络,例如 ConvNets (即Conv,MaxPool等). /p>

这是我的完整代码,可轻松实现:

注意:我知道我可以使用Keras之类的高级API来轻松构建模型,但这不是我的目标.请理解.

import numpy as np
import os
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import tensorflow as tf
import tensorflow_datasets as tfds

(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

# reshaping
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test  = tf.reshape(x_test, shape=(x_test.shape[0], 784))

ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# rescaling
ds_train = ds_train.map(lambda x, y: (tf.cast(x, tf.float32)/255.0, y))

class Model(object):
    def __init__(self, hidden1_size, hidden2_size, device=None):
        # layer sizes along with input and output
        self.input_size, self.output_size, self.device = 784, 10, device
        self.hidden1_size, self.hidden2_size = hidden1_size, hidden2_size
        self.lr_rate = 1e-03

        # weights initializationg
        self.glorot_init = tf.initializers.glorot_uniform(seed=42)
        # weights b/w input to hidden1 --> 1
        self.w_h1 = tf.Variable(self.glorot_init((self.input_size, self.hidden1_size)))
        # weights b/w hidden1 to hidden2 ---> 2
        self.w_h2 = tf.Variable(self.glorot_init((self.hidden1_size, self.hidden2_size)))
        # weights b/w hidden2 to output ---> 3
        self.w_out = tf.Variable(self.glorot_init((self.hidden2_size, self.output_size)))

        # bias initialization
        self.b1 = tf.Variable(self.glorot_init((self.hidden1_size,)))
        self.b2 = tf.Variable(self.glorot_init((self.hidden2_size,)))
        self.b_out = tf.Variable(self.glorot_init((self.output_size,)))

        self.variables = [self.w_h1, self.b1, self.w_h2, self.b2, self.w_out, self.b_out]


    def feed_forward(self, x):
        if self.device is not None:
            with tf.device('gpu:0' if self.device=='gpu' else 'cpu'):
                # layer1
                self.layer1 = tf.nn.sigmoid(tf.add(tf.matmul(x, self.w_h1), self.b1))
                # layer2
                self.layer2 = tf.nn.sigmoid(tf.add(tf.matmul(self.layer1,
                                                             self.w_h2), self.b2))
                # output layer
                self.output = tf.nn.softmax(tf.add(tf.matmul(self.layer2,
                                                             self.w_out), self.b_out))
        return self.output

    def loss_fn(self, y_pred, y_true):
        self.loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_true, 
                                                                  logits=y_pred)
        return tf.reduce_mean(self.loss)

    def acc_fn(self, y_pred, y_true):
        y_pred = tf.cast(tf.argmax(y_pred, axis=1), tf.int32)
        y_true = tf.cast(y_true, tf.int32)
        predictions = tf.cast(tf.equal(y_true, y_pred), tf.float32)
        return tf.reduce_mean(predictions)

    def backward_prop(self, batch_xs, batch_ys):
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr_rate)
        with tf.GradientTape() as tape:
            predicted = self.feed_forward(batch_xs)
            step_loss = self.loss_fn(predicted, batch_ys)
        grads = tape.gradient(step_loss, self.variables)
        optimizer.apply_gradients(zip(grads, self.variables))

n_shape = x_train.shape[0]
epochs = 20
batch_size = 128

ds_train = ds_train.repeat().shuffle(n_shape).batch(batch_size).prefetch(batch_size)

neural_net = Model(512, 256, 'gpu')

for epoch in range(epochs):
    no_steps = n_shape//batch_size
    avg_loss = 0.
    avg_acc = 0.
    for (batch_xs, batch_ys) in ds_train.take(no_steps):
        preds = neural_net.feed_forward(batch_xs)
        avg_loss += float(neural_net.loss_fn(preds, batch_ys)/no_steps) 
        avg_acc += float(neural_net.acc_fn(preds, batch_ys) /no_steps)
        neural_net.backward_prop(batch_xs, batch_ys)
    print(f'Epoch: {epoch}, Training Loss: {avg_loss}, Training ACC: {avg_acc}')

# output for 10 epochs:
Epoch: 0, Training Loss: 1.7005115111824125, Training ACC: 0.7603832868262543
Epoch: 1, Training Loss: 1.6052448933478445, Training ACC: 0.8524806404020637
Epoch: 2, Training Loss: 1.5905528008006513, Training ACC: 0.8664196092868224
Epoch: 3, Training Loss: 1.584107405738905, Training ACC: 0.8727630912326276
Epoch: 4, Training Loss: 1.5792385798413306, Training ACC: 0.8773203844903037
Epoch: 5, Training Loss: 1.5759121985174716, Training ACC: 0.8804754322627559
Epoch: 6, Training Loss: 1.5739163148682564, Training ACC: 0.8826455712551251
Epoch: 7, Training Loss: 1.5722616605926305, Training ACC: 0.8840812018606812
Epoch: 8, Training Loss: 1.569699136307463, Training ACC: 0.8867688354803249
Epoch: 9, Training Loss: 1.5679460542742163, Training ACC: 0.8885049475356936

解决方案

我想知道从哪里开始您的多问题,我决定从下面开始声明:

您的代码绝对不应该看起来像这样,并且与当前的Tensorflow最佳实践相差不远.

对不起,但是逐步调试它会浪费每个人的时间,并且不会使我们任何一个人受益.

现在,移至第三点:

3)下面的代码中还有什么我可以进一步优化的东西了 在这段代码中,例如可能使用tensorflow 2.x @ tf.function 装饰等).

是的,您可以使用tensorflow2.0功能,并且您似乎正在远离这些功能(tf.function装饰器在这里实际上没有用,暂时将其保留).

遵循新准则也可以缓解您的第5点问题,即:

5)我还需要帮助以更通用的方式编写此代码,因此 我可以轻松实现其他网络,例如ConvNets(即Conv,MaxPool 等等).

,因为它是专门为此设计的.稍作介绍后,我将尝试通过几个步骤向您介绍这些概念:

1.将程序分为逻辑部分

在代码可读性方面,Tensorflow造成了很大的伤害;通常,tf1.x中的所有内容都放在一个地方,全局变量后跟函数定义,然后是另一个全局变量,或者也许是数据加载,所有这些都是混乱的.这并不是开发人员的错,因为系统的设计鼓励了这些动作.

现在,在tf2.0中,鼓励程序员按照与pytorchchainer和其他更用户友好的框架中可以看到的结构相似的方式划分工作.

1.1数据加载

您在使用 Tensorflow数据集的途中步入正轨,但是您没有明显的理由拒绝了.

这是您的代码,其中有注释:

# You already have tf.data.Dataset objects after load
(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

# But you are reshaping them in a strange manner...
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test  = tf.reshape(x_test, shape=(x_test.shape[0], 784))

# And building from slices...
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Unreadable rescaling (there are built-ins for that)

对于任何数据集,您都可以轻松地概括该想法,将其放在单独的模块中,例如datasets.py:

import tensorflow as tf
import tensorflow_datasets as tfds


class ImageDatasetCreator:
    @classmethod
    # More portable and readable than dividing by 255
    def _convert_image_dtype(cls, dataset):
        return dataset.map(
            lambda image, label: (
                tf.image.convert_image_dtype(image, tf.float32),
                label,
            )
        )

    def __init__(self, name: str, batch: int, cache: bool = True, split=None):
        # Load dataset, every dataset has default train, test split
        dataset = tfds.load(name, as_supervised=True, split=split)
        # Convert to float range
        try:
            self.train = ImageDatasetCreator._convert_image_dtype(dataset["train"])
            self.test = ImageDatasetCreator._convert_image_dtype(dataset["test"])
        except KeyError as exception:
            raise ValueError(
                f"Dataset {name} does not have train and test, write your own custom dataset handler."
            ) from exception

        if cache:
            self.train = self.train.cache()  # speed things up considerably
            self.test = self.test.cache()

        self.batch: int = batch

    def get_train(self):
        return self.train.shuffle().batch(self.batch).repeat()

    def get_test(self):
        return self.test.batch(self.batch).repeat()

因此,现在您可以使用简单的命令加载超过mnist的内容:

from datasets import ImageDatasetCreator

if __name__ == "__main__":
    dataloader = ImageDatasetCreator("mnist", batch=64, cache = True)
    train, test = dataloader.get_train(), dataloader.get_test()

并且您可以使用除mnist之外的其他任何名称,以便从现在开始加载数据集.

请停止编写与深度学习有关的所有内容,而且您也是程序员.

1.2模型创建

由于tf2.0,根据模型的复杂性,有两种建议的处理方式:

  • tensorflow.keras.models.Sequential- @Stewart_R 显示了这种方式,无需重复他的观点.用于最简单的模型(您应该将此模型与前馈一起使用).
  • 继承tensorflow.keras.Model并编写自定义模型.当模块中有某种逻辑或更复杂(例如ResNets,多路径网络等)时,应使用此逻辑.总而言之,它更具可读性和可定制性.

您的Model类尝试类似于这样的类,但是又再次向南移动. backprop绝对不是模型本身的一部分,也不是lossaccuracy的一部分,将它们分为另一个模块或函数,而不是成员!

也就是说,让我们使用第二种方法对网络进行编码(为简便起见,应将此代码放在model.py中).在此之前,我将通过继承tf.keras.Layers从头开始编写YourDense前馈层(此代码可能会进入layers.py模块):

import tensorflow as tf

class YourDense(tf.keras.layers.Layer):
    def __init__(self, units):
        # It's Python 3, you don't have to specify super parents explicitly
        super().__init__()
        self.units = units

    # Use build to create variables, as shape can be inferred from previous layers
    # If you were to create layers in __init__, one would have to provide input_shape
    # (same as it occurs in PyTorch for example)
    def build(self, input_shape):
        # You could use different initializers here as well
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        # You could define bias in __init__ as well as it's not input dependent
        self.bias = self.add_weight(shape=(self.units,), initializer="random_normal")
        # Oh, trainable=True is default

    def call(self, inputs):
        # Use overloaded operators instead of tf.add, better readability
        return tf.matmul(inputs, self.kernel) + self.bias

关于您的

1)如何在此自定义中添加一个Dropout和Batch Normalization图层 执行? (即使其在培训和测试时间内均有效)

我想您想为这些层创建一个自定义实现. 如果没有,您可以导入from tensorflow.keras.layers import Dropout并在 @Leevo 指出的任何位置使用它. 在下面的traintest期间具有不同行为的反向辍学:

class CustomDropout(layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            # You could simply create binary mask and multiply here
            return tf.nn.dropout(inputs, rate=self.rate)
        # You would need to multiply by dropout rate if you were to do that
        return self.rate * inputs

从此处取而代之的层进行了修改,以更适合展示目的. /p>

现在,您终于可以创建模型了(简单的双前馈):

import tensorflow as tf

from layers import YourDense


class Model(tf.keras.Model):
    def __init__(self):
        super().__init__()
        # Use Sequential here for readability
        self.network = tf.keras.Sequential(
            [YourDense(100), tf.keras.layers.ReLU(), YourDense(10)]
        )

    def call(self, inputs):
        # You can use non-parametric layers inside call as well
        flattened = tf.keras.layers.Flatten()(inputs)
        return self.network(flattened)

Ofc,您应在常规实现中尽可能多地使用内置功能.<​​/p>

此结构相当可扩展,因此可以泛化为卷积网络,resnets,senets,无论应通过此模块执行什么操作.您可以在此处了解更多信息.

我认为它满足了您的第五点:

5)我还需要帮助以更通用的方式编写此代码,因此 我可以轻松实现其他网络,例如ConvNets(即Conv,MaxPool 等等).

最后,您可能必须使用model.build(shape)才能构建模型图.

model.build((None, 28, 28, 1))

这将用于MNIST的28x28x1输入形状,其中None代表批处理.

1.3培训

再一次,培训可以通过两种不同的方式进行:

  • 标准Keras model.fit(dataset) -在分类等简单任务中很有用
  • tf.GradientTape -更复杂的培训方案,最突出的例子是生成对抗网络,其中两个模型在玩minmax游戏时优化正交目标

再次由 @Leevo 指出,如果您要使用第二种方法,您将不会能够简单地使用Keras提供的回调,因此,我建议尽可能使用第一种选择.

理论上,您可以手动调用回调函数,例如on_batch_begin()以及需要的其他函数,但这会很麻烦,而且我不确定这将如何工作.

当涉及第一个选项时,您可以直接使用tf.data.Dataset对象.这是另一个模块(最好是train.py)中的内容:

def train(
    model: tf.keras.Model,
    path: str,
    train: tf.data.Dataset,
    epochs: int,
    steps_per_epoch: int,
    validation: tf.data.Dataset,
    steps_per_validation: int,
    stopping_epochs: int,
    optimizer=tf.optimizers.Adam(),
):
    model.compile(
        optimizer=optimizer,
        # I used logits as output from the last layer, hence this
        loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.metrics.SparseCategoricalAccuracy()],
    )

    model.fit(
        train,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        validation_data=validation,
        validation_steps=steps_per_validation,
        callbacks=[
            # Tensorboard logging
            tf.keras.callbacks.TensorBoard(
                pathlib.Path("logs")
                / pathlib.Path(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")),
                histogram_freq=1,
            ),
            # Early stopping with best weights preserving
            tf.keras.callbacks.EarlyStopping(
                monitor="val_sparse_categorical_accuracy",
                patience=stopping_epochs,
                restore_best_weights=True,
            ),
        ],
    )
    model.save(path)

更复杂的方法与PyTorch训练循环非常相似(几乎是复制和粘贴),因此,如果您熟悉这些循环,就不会造成太大的问题.

您可以在整个tf2.0文档中找到示例,例如此处 @Leevo 所指出的,

weights = model.get_weights()

可以帮助您减轻体重.您可以将它们转换为np.array并使用seabornmatplotlib进行绘图,分析,检查或其他所需的内容.

2.2放在一起

总而言之,您的main.py(或入口点或类似的东西)将包括以下内容(或多或少):

from dataset import ImageDatasetCreator
from model import Model
from train import train

# You could use argparse for things like batch, epochs etc.
if __name__ == "__main__":
    dataloader = ImageDatasetCreator("mnist", batch=64, cache=True)
    train, test = dataloader.get_train(), dataloader.get_test()
    model = Model()
    model.build((None, 28, 28, 1))
    train(
        model, train, path epochs, test, len(train) // batch, len(test) // batch, ...
    )  # provide necessary arguments appropriately
    # Do whatever you want with those
    weights = model.get_weights()

哦,请记住,以上功能不适用于复制粘贴,应将其视为准则.如有任何疑问,请打我.

3.评论中的问题

3.1如何初始化自定义图层和内置图层

3.1.1 TLDR您将要阅读的内容

  • 自定义Poisson初始化功能,但需要 3 参数
  • tf.keras.initalization API需要两个参数(请参阅文档中的最后一点 >),因此一个是 通过我们在
  • 之前编写的Python的lambda内部自定义层中指定
  • 添加了该层的可选偏压,可以使用以下命令将其关闭 布尔值

为什么如此无用的复杂? 为了证明在tf2.0中您最终可以使用Python的功能,不再麻烦图形,使用if代替tf.cond等.

3.1.2从TLDR到实施

可在此处和Tensorflow的风味YourLinear):

import typing

import tensorflow as tf


class YourDense(tf.keras.layers.Layer):
    # It's still Python, use it as Python, that's the point of tf.2.0
    @classmethod
    def register_initialization(cls, initializer):
        # Set defaults if init not provided by user
        if initializer is None:
            # let's make the signature proper for init in tf.keras
            return lambda shape, dtype: my_dumb_init(shape, 1, dtype)
        return initializer

    def __init__(
        self,
        units: int,
        bias: bool = True,
        # can be string or callable, some typing info added as well...
        kernel_initializer: typing.Union[str, typing.Callable] = None,
        bias_initializer: typing.Union[str, typing.Callable] = None,
    ):
        super().__init__()
        self.units: int = units
        self.kernel_initializer = YourDense.register_initialization(kernel_initializer)
        if bias:
            self.bias_initializer = YourDense.register_initialization(bias_initializer)
        else:
            self.bias_initializer = None

    def build(self, input_shape):
        # Simply pass your init here
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer=self.kernel_initializer,
            trainable=True,
        )
        if self.bias_initializer is not None:
            self.bias = self.add_weight(
                shape=(self.units,), initializer=self.bias_initializer
            )
        else:
            self.bias = None

    def call(self, inputs):
        weights = tf.matmul(inputs, self.kernel)
        if self.bias is not None:
            return weights + self.bias

我添加了my_dumb_initialization作为默认值(如果用户未提供),并使用bias参数使偏倚成为可选.请注意,只要不依赖数据,就可以自由使用if.如果它是(或某种程度上依赖于tf.Tensor),则必须使用@tf.function装饰器,该装饰器将Python的流更改为其对应的tensorflow(例如iftf.cond).

有关签名的更多信息,请参见此处,它很容易遵循.

>

如果要将上述初始化程序更改合并到模型中,则必须创建适当的对象.

... # Previous of code Model here
self.network = tf.keras.Sequential(
    [
        YourDense(100, bias=False, kernel_initializer="lecun_uniform"),
        tf.keras.layers.ReLU(),
        YourDense(10, bias_initializer=tf.initializers.Ones()),
    ]
)
... # and the same afterwards

使用内置的tf.keras.layers.Dense图层,可以执行相同的操作(参数名称不同,但思想适用).

3.2使用tf.GradientTape

的自动区分

3.2.1简介

tf.GradientTape的要点是允许用户对其他变量进行正常的Python控制流和变量梯度计算.

示例摘自此处,但分为以下几个部分:

def f(x, y):
  output = 1.0
  for i in range(y):
    if i > 1 and i < 5:
      output = tf.multiply(output, x)
  return output

具有forif流控制语句的常规python函数

def grad(x, y):
  with tf.GradientTape() as t:
    t.watch(x)
    out = f(x, y)
  return t.gradient(out, x)

使用渐变磁带,您可以在Tensors上记录所有操作(以及它们的中间状态)并向后播放"(使用追迹规则执行自动向后微分).

tf.GradientTape()上下文管理器中的每个Tensor被自动记录.如果某些Tensor不在范围内,请使用watch()方法,如上所示.

最后,相对于xoutput坡度(返回输入).

3.2.2与深度学习的联系

上面描述的是backpropagation算法.为网络中的每个节点(或每个层)计算输出的(相对于)梯度w.r.t.然后,各种优化程序会使用这些梯度进行更正,然后重复进行.

让我们继续,并假设您已经设置了tf.keras.Model,优化程序实例,tf.data.Dataset和损失函数.

可以定义一个Trainer类,该类将为我们执行培训. 如有疑问,请阅读代码中的注释:

class Trainer:
    def __init__(self, model, optimizer, loss_function):
        self.model = model
        self.loss_function = loss_function
        self.optimizer = optimizer
        # You could pass custom metrics in constructor
        # and adjust train_step and test_step accordingly
        self.train_loss = tf.keras.metrics.Mean(name="train_loss")
        self.test_loss = tf.keras.metrics.Mean(name="train_loss")

    def train_step(self, x, y):
        # Setup tape
        with tf.GradientTape() as tape:
            # Get current predictions of network
            y_pred = self.model(x)
            # Calculate loss generated by predictions
            loss = self.loss_function(y, y_pred)
        # Get gradients of loss w.r.t. EVERY trainable variable (iterable returned)
        gradients = tape.gradient(loss, self.model.trainable_variables)
        # Change trainable variable values according to gradient by applying optimizer policy
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
        # Record loss of current step
        self.train_loss(loss)

    def train(self, dataset):
        # For N epochs iterate over dataset and perform train steps each time
        for x, y in dataset:
            self.train_step(x, y)

    def test_step(self, x, y):
        # Record test loss separately
        self.test_loss(self.loss_function(y, self.model(x)))

    def test(self, dataset):
        # Iterate over whole dataset
        for x, y in dataset:
            self.test_step(x, y)

    def __str__(self):
        # You need Python 3.7 with f-string support
        # Just return metrics
        return f"Loss: {self.train_loss.result()}, Test Loss: {self.test_loss.result()}"

现在,您可以像下面这样简单地在代码中使用此类:

EPOCHS = 5

# model, optimizer, loss defined beforehand
trainer = Trainer(model, optimizer, loss)
for _ in range(EPOCHS):
    trainer.train(train_dataset) # Same for training and test datasets
    trainer.test(test_dataset)
    print(f"Epoch {epoch}: {trainer})")

打印会告诉您每个时期的训练和测试损失.您可以根据需要将训练和测试混合使用(例如,训练5个纪元和1个测试),还可以添加其他指标等.

如果您要使用非面向OOP的方法,请参见此处.可读,但各有千秋).

I tried to write a custom implementation of basic neural network with two hidden layers on MNIST dataset using *TensorFlow 2.0 beta* but I'm not sure what went wrong here but my training loss and accuracy seems to stuck at 1.5 and around 85 respectively. But If I build the using Keras I was getting very low training loss and accuracy above 95% with just 8-10 epochs.

I believe that maybe I'm not updating my weights or something? So do I need to assign my new weights which I compute in backprop function backs to their respective weights/bias variables?

I really appreciate if someone could help me out with this and these few more questions that I've mentioned below.

Few more Questions:

1) How to add a Dropout and Batch Normalization layer in this custom implementation? (i.e making it work for both train and test time)

2) How can I use callbacks in this code? i.e (making use of EarlyStopping and ModelCheckpoint callbacks)

3) Is there anything else in my code below that I can optimize further in this code like maybe making use of tensorflow 2.x @tf.function decorator etc.)

4) I would also require to extract the final weights that I obtain for plotting and checking their distributions. To investigate issues like gradient vanishing or exploding. (Eg: Maybe Tensorboard)

5) I also want help in writing this code in a more generalized way so I can easily implement other networks like ConvNets (i.e Conv, MaxPool, etc.) based on this code easily.

Here's my full code for easy reproducibility :

Note: I know I can use high-level API like Keras to build the model much easier but that is not my goal here. Please understand.

import numpy as np
import os
import logging
logging.getLogger('tensorflow').setLevel(logging.ERROR)
import tensorflow as tf
import tensorflow_datasets as tfds

(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

# reshaping
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test  = tf.reshape(x_test, shape=(x_test.shape[0], 784))

ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# rescaling
ds_train = ds_train.map(lambda x, y: (tf.cast(x, tf.float32)/255.0, y))

class Model(object):
    def __init__(self, hidden1_size, hidden2_size, device=None):
        # layer sizes along with input and output
        self.input_size, self.output_size, self.device = 784, 10, device
        self.hidden1_size, self.hidden2_size = hidden1_size, hidden2_size
        self.lr_rate = 1e-03

        # weights initializationg
        self.glorot_init = tf.initializers.glorot_uniform(seed=42)
        # weights b/w input to hidden1 --> 1
        self.w_h1 = tf.Variable(self.glorot_init((self.input_size, self.hidden1_size)))
        # weights b/w hidden1 to hidden2 ---> 2
        self.w_h2 = tf.Variable(self.glorot_init((self.hidden1_size, self.hidden2_size)))
        # weights b/w hidden2 to output ---> 3
        self.w_out = tf.Variable(self.glorot_init((self.hidden2_size, self.output_size)))

        # bias initialization
        self.b1 = tf.Variable(self.glorot_init((self.hidden1_size,)))
        self.b2 = tf.Variable(self.glorot_init((self.hidden2_size,)))
        self.b_out = tf.Variable(self.glorot_init((self.output_size,)))

        self.variables = [self.w_h1, self.b1, self.w_h2, self.b2, self.w_out, self.b_out]


    def feed_forward(self, x):
        if self.device is not None:
            with tf.device('gpu:0' if self.device=='gpu' else 'cpu'):
                # layer1
                self.layer1 = tf.nn.sigmoid(tf.add(tf.matmul(x, self.w_h1), self.b1))
                # layer2
                self.layer2 = tf.nn.sigmoid(tf.add(tf.matmul(self.layer1,
                                                             self.w_h2), self.b2))
                # output layer
                self.output = tf.nn.softmax(tf.add(tf.matmul(self.layer2,
                                                             self.w_out), self.b_out))
        return self.output

    def loss_fn(self, y_pred, y_true):
        self.loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y_true, 
                                                                  logits=y_pred)
        return tf.reduce_mean(self.loss)

    def acc_fn(self, y_pred, y_true):
        y_pred = tf.cast(tf.argmax(y_pred, axis=1), tf.int32)
        y_true = tf.cast(y_true, tf.int32)
        predictions = tf.cast(tf.equal(y_true, y_pred), tf.float32)
        return tf.reduce_mean(predictions)

    def backward_prop(self, batch_xs, batch_ys):
        optimizer = tf.keras.optimizers.Adam(learning_rate=self.lr_rate)
        with tf.GradientTape() as tape:
            predicted = self.feed_forward(batch_xs)
            step_loss = self.loss_fn(predicted, batch_ys)
        grads = tape.gradient(step_loss, self.variables)
        optimizer.apply_gradients(zip(grads, self.variables))

n_shape = x_train.shape[0]
epochs = 20
batch_size = 128

ds_train = ds_train.repeat().shuffle(n_shape).batch(batch_size).prefetch(batch_size)

neural_net = Model(512, 256, 'gpu')

for epoch in range(epochs):
    no_steps = n_shape//batch_size
    avg_loss = 0.
    avg_acc = 0.
    for (batch_xs, batch_ys) in ds_train.take(no_steps):
        preds = neural_net.feed_forward(batch_xs)
        avg_loss += float(neural_net.loss_fn(preds, batch_ys)/no_steps) 
        avg_acc += float(neural_net.acc_fn(preds, batch_ys) /no_steps)
        neural_net.backward_prop(batch_xs, batch_ys)
    print(f'Epoch: {epoch}, Training Loss: {avg_loss}, Training ACC: {avg_acc}')

# output for 10 epochs:
Epoch: 0, Training Loss: 1.7005115111824125, Training ACC: 0.7603832868262543
Epoch: 1, Training Loss: 1.6052448933478445, Training ACC: 0.8524806404020637
Epoch: 2, Training Loss: 1.5905528008006513, Training ACC: 0.8664196092868224
Epoch: 3, Training Loss: 1.584107405738905, Training ACC: 0.8727630912326276
Epoch: 4, Training Loss: 1.5792385798413306, Training ACC: 0.8773203844903037
Epoch: 5, Training Loss: 1.5759121985174716, Training ACC: 0.8804754322627559
Epoch: 6, Training Loss: 1.5739163148682564, Training ACC: 0.8826455712551251
Epoch: 7, Training Loss: 1.5722616605926305, Training ACC: 0.8840812018606812
Epoch: 8, Training Loss: 1.569699136307463, Training ACC: 0.8867688354803249
Epoch: 9, Training Loss: 1.5679460542742163, Training ACC: 0.8885049475356936

解决方案

I wondered where to start with your multiquestion, and I decided to do so with a statement:

Your code definitely should not look like that and is nowhere near current Tensorflow best practices.

Sorry, but debugging it step by step is waste of everyone's time and would not benefit either of us.

Now, moving to the third point:

3) Is there anything else in my code below that I can optimize further in this code like maybe making use of tensorflow 2.x @tf.function decorator etc.)

Yes, you can use tensorflow2.0 functionalities and it seems like you are running away from those (tf.function decorator is of no use here actually, leave it for the time being).

Following new guidelines would alleviate your problems with your 5th point as well, namely:

5) I also want help in writing this code in a more generalized way so I can easily implement other networks like ConvNets (i.e Conv, MaxPool etc.) based on this code easily.

as it's designed specifically for that. After a little introduction I will try to introduce you to those concepts in a few steps:

1. Divide your program into logical parts

Tensorflow did much harm when it comes to code readability; everything in tf1.x was usually crunched in one place, globals followed by function definition followed by another globals or maybe data loading, all in all mess. It's not really developers fault as the system's design encouraged those actions.

Now, in tf2.0 programmer is encouraged to divide his work similarly to the structure one can see in pytorch, chainer and other more user-friendly frameworks.

1.1 Data loading

You were on good path with Tensorflow Datasets but you turned away for no apparent reason.

Here is your code with commentary what's going on:

# You already have tf.data.Dataset objects after load
(x_train, y_train), (x_test, y_test) = tfds.load('mnist', split=['train', 'test'], 
                                                  batch_size=-1, as_supervised=True)

# But you are reshaping them in a strange manner...
x_train = tf.reshape(x_train, shape=(x_train.shape[0], 784))
x_test  = tf.reshape(x_test, shape=(x_test.shape[0], 784))

# And building from slices...
ds_train = tf.data.Dataset.from_tensor_slices((x_train, y_train))
# Unreadable rescaling (there are built-ins for that)

You can easily generalize this idea for any dataset, place this in separate module, say datasets.py:

import tensorflow as tf
import tensorflow_datasets as tfds


class ImageDatasetCreator:
    @classmethod
    # More portable and readable than dividing by 255
    def _convert_image_dtype(cls, dataset):
        return dataset.map(
            lambda image, label: (
                tf.image.convert_image_dtype(image, tf.float32),
                label,
            )
        )

    def __init__(self, name: str, batch: int, cache: bool = True, split=None):
        # Load dataset, every dataset has default train, test split
        dataset = tfds.load(name, as_supervised=True, split=split)
        # Convert to float range
        try:
            self.train = ImageDatasetCreator._convert_image_dtype(dataset["train"])
            self.test = ImageDatasetCreator._convert_image_dtype(dataset["test"])
        except KeyError as exception:
            raise ValueError(
                f"Dataset {name} does not have train and test, write your own custom dataset handler."
            ) from exception

        if cache:
            self.train = self.train.cache()  # speed things up considerably
            self.test = self.test.cache()

        self.batch: int = batch

    def get_train(self):
        return self.train.shuffle().batch(self.batch).repeat()

    def get_test(self):
        return self.test.batch(self.batch).repeat()

So now you can load more than mnist using simple command:

from datasets import ImageDatasetCreator

if __name__ == "__main__":
    dataloader = ImageDatasetCreator("mnist", batch=64, cache = True)
    train, test = dataloader.get_train(), dataloader.get_test()

And you could use any name other than mnist you want to load datasets from now on.

Please, stop making everything deep learning related one hand-off scripts, you are a programmer as well.

1.2 Model creation

Since tf2.0 there are two advised ways one can proceed depending on models complexity:

  • tensorflow.keras.models.Sequential - this way was shown by @Stewart_R, no need to reiterate his points. Used for the simplest models (you should use this one with your feedforward).
  • Inheriting tensorflow.keras.Model and writing custom model. This one should be used when you have some kind of logic inside your module or it's more complicated (things like ResNets, multipath networks etc.). All in all more readable and customizable.

Your Model class tried to resemble something like that but it went south again; backprop definitely is not part of the model itself, neither is loss or accuracy, separate them into another module or function, defo not a member!

That said, let's code the network using the second approach (you should place this code in model.py for brevity). Before that, I will code YourDense feedforward layer from scratch by inheriting from tf.keras.Layers (this one might go into layers.py module):

import tensorflow as tf

class YourDense(tf.keras.layers.Layer):
    def __init__(self, units):
        # It's Python 3, you don't have to specify super parents explicitly
        super().__init__()
        self.units = units

    # Use build to create variables, as shape can be inferred from previous layers
    # If you were to create layers in __init__, one would have to provide input_shape
    # (same as it occurs in PyTorch for example)
    def build(self, input_shape):
        # You could use different initializers here as well
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer="random_normal",
            trainable=True,
        )
        # You could define bias in __init__ as well as it's not input dependent
        self.bias = self.add_weight(shape=(self.units,), initializer="random_normal")
        # Oh, trainable=True is default

    def call(self, inputs):
        # Use overloaded operators instead of tf.add, better readability
        return tf.matmul(inputs, self.kernel) + self.bias

Regarding your

1) How to add a Dropout and Batch Normalization layer in this custom implementation? (i.e making it work for both train and test time)

I suppose you would like to create a custom implementation of those layers. If not, you can just import from tensorflow.keras.layers import Dropout and use it anywhere you want as @Leevo pointed out. Inverted dropout with different behaviour during train and test below:

class CustomDropout(layers.Layer):
    def __init__(self, rate, **kwargs):
        super().__init__(**kwargs)
        self.rate = rate

    def call(self, inputs, training=None):
        if training:
            # You could simply create binary mask and multiply here
            return tf.nn.dropout(inputs, rate=self.rate)
        # You would need to multiply by dropout rate if you were to do that
        return self.rate * inputs

Layers taken from here and modified to better fit showcasing purpose.

Now you can create your model finally (simple double feedforward):

import tensorflow as tf

from layers import YourDense


class Model(tf.keras.Model):
    def __init__(self):
        super().__init__()
        # Use Sequential here for readability
        self.network = tf.keras.Sequential(
            [YourDense(100), tf.keras.layers.ReLU(), YourDense(10)]
        )

    def call(self, inputs):
        # You can use non-parametric layers inside call as well
        flattened = tf.keras.layers.Flatten()(inputs)
        return self.network(flattened)

Ofc, you should use built-ins as much as possible in general implementations.

This structure is pretty extensible, so generalization to convolutional nets, resnets, senets, whatever should be done via this module. You can read more about it here.

I think it fulfills your 5th point:

5) I also want help in writing this code in a more generalized way so I can easily implement other networks like ConvNets (i.e Conv, MaxPool etc.) based on this code easily.

Last thing, you may have to use model.build(shape) in order to build your model's graph.

model.build((None, 28, 28, 1))

This would be for MNIST's 28x28x1 input shape, where None stands for batch.

1.3 Training

Once again, training could be done in two separate ways:

  • standard Keras model.fit(dataset) - useful in simple tasks like classification
  • tf.GradientTape - more complicated training schemes, most prominent example would be Generative Adversarial Networks, where two models optimize orthogonal goals playing minmax game

As pointed out by @Leevo once again, if you are to use the second way, you won't be able to simply use callbacks provided by Keras, hence I'd advise to stick with the first option whenever possible.

In theory you could call callback's functions manually like on_batch_begin() and others where needed, but it would be cumbersome and I'm not sure how would this work.

When it comes to the first option, you can use tf.data.Dataset objects directly with fit. Here is it presented inside another module (preferably train.py):

def train(
    model: tf.keras.Model,
    path: str,
    train: tf.data.Dataset,
    epochs: int,
    steps_per_epoch: int,
    validation: tf.data.Dataset,
    steps_per_validation: int,
    stopping_epochs: int,
    optimizer=tf.optimizers.Adam(),
):
    model.compile(
        optimizer=optimizer,
        # I used logits as output from the last layer, hence this
        loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.metrics.SparseCategoricalAccuracy()],
    )

    model.fit(
        train,
        epochs=epochs,
        steps_per_epoch=steps_per_epoch,
        validation_data=validation,
        validation_steps=steps_per_validation,
        callbacks=[
            # Tensorboard logging
            tf.keras.callbacks.TensorBoard(
                pathlib.Path("logs")
                / pathlib.Path(datetime.datetime.now().strftime("%Y%m%d-%H%M%S")),
                histogram_freq=1,
            ),
            # Early stopping with best weights preserving
            tf.keras.callbacks.EarlyStopping(
                monitor="val_sparse_categorical_accuracy",
                patience=stopping_epochs,
                restore_best_weights=True,
            ),
        ],
    )
    model.save(path)

More complicated approach is very similar (almost copy and paste) to PyTorch training loops, so if you are familiar with those, they should not pose much of a problem.

You can find examples throughout tf2.0 docs, e.g. here or here.

2. Other things

2.1 Unanswered questions

4) Is there anything else in the code that I can optimize further in this code? i.e (making use of tensorflow 2.x @tf.function decorator etc.)

Above already transforms the Model into graphs, hence I don't think you would benefit from calling it in this case. And premature optimization is the root of all evil, remember to measure your code before doing this.

You would gain much more with proper caching of data (as described at the beginning of #1.1) and good pipeline rather than those.

5) Also I need a way to extract all my final weights for all layers after training so I can plot them and check their distributions. To check issues like gradient vanishing or exploding.

As pointed out by @Leevo above,

weights = model.get_weights()

Would get you the weights. You may transform them into np.array and plot using seaborn, matplotlib, analyze, check or whatever else you want.

2.2 Putting it altogether

All in all, your main.py (or entrypoint or something similar) would consist of this (more or less):

from dataset import ImageDatasetCreator
from model import Model
from train import train

# You could use argparse for things like batch, epochs etc.
if __name__ == "__main__":
    dataloader = ImageDatasetCreator("mnist", batch=64, cache=True)
    train, test = dataloader.get_train(), dataloader.get_test()
    model = Model()
    model.build((None, 28, 28, 1))
    train(
        model, train, path epochs, test, len(train) // batch, len(test) // batch, ...
    )  # provide necessary arguments appropriately
    # Do whatever you want with those
    weights = model.get_weights()

Oh, remember that above functions are not for copy pasting and should be treated more like a guideline. Hit me up if you have any questions.

3. Questions from comments

3.1 How to initialize custom and built-in layers

3.1.1 TLDR what you are about to read

  • Custom Poisson initalization function, but it takes three arguments
  • tf.keras.initalization API needs two arguments (see last point in their docs), hence one is specified via Python's lambda inside custom layer we have written before
  • Optional bias for the layer is added, which can be turned off with boolean

Why is it so uselessly complicated? To show that in tf2.0 you can finally use Python's functionality, no more graph hassle, if instead of tf.cond etc.

3.1.2 From TLDR to implementation

Keras initializers can be found here and Tensorflow's flavor here.

Please note API inconsistencies (capital letters like classes, small letters with underscore like functions), especially in tf2.0, but that's beside the point.

You can use them by passing a string (as it's done in YourDense above) or during object creation.

To allow for custom initialization in your custom layers, you can simply add additional argument to the constructor (tf.keras.Model class is still Python class and it's __init__ should be used same as Python's).

Before that, I will show you how to create custom initialization:

# Poisson custom initialization because why not.
def my_dumb_init(shape, lam, dtype=None):
    return tf.squeeze(tf.random.poisson(shape, lam, dtype=dtype))

Notice, it's signature takes three arguments, while it should take (shape, dtype) only. Still, one can "fix" this easily while creating his own layer, like the one below (extended YourLinear):

import typing

import tensorflow as tf


class YourDense(tf.keras.layers.Layer):
    # It's still Python, use it as Python, that's the point of tf.2.0
    @classmethod
    def register_initialization(cls, initializer):
        # Set defaults if init not provided by user
        if initializer is None:
            # let's make the signature proper for init in tf.keras
            return lambda shape, dtype: my_dumb_init(shape, 1, dtype)
        return initializer

    def __init__(
        self,
        units: int,
        bias: bool = True,
        # can be string or callable, some typing info added as well...
        kernel_initializer: typing.Union[str, typing.Callable] = None,
        bias_initializer: typing.Union[str, typing.Callable] = None,
    ):
        super().__init__()
        self.units: int = units
        self.kernel_initializer = YourDense.register_initialization(kernel_initializer)
        if bias:
            self.bias_initializer = YourDense.register_initialization(bias_initializer)
        else:
            self.bias_initializer = None

    def build(self, input_shape):
        # Simply pass your init here
        self.kernel = self.add_weight(
            shape=(input_shape[-1], self.units),
            initializer=self.kernel_initializer,
            trainable=True,
        )
        if self.bias_initializer is not None:
            self.bias = self.add_weight(
                shape=(self.units,), initializer=self.bias_initializer
            )
        else:
            self.bias = None

    def call(self, inputs):
        weights = tf.matmul(inputs, self.kernel)
        if self.bias is not None:
            return weights + self.bias

I have added my_dumb_initialization as the default (if user does not provide one) and made the bias optional with bias argument. Note you can use if freely as long as it's not data dependent. If it is (or is dependent on tf.Tensor somehow), one has to use @tf.function decorator which changes Python's flow to it's tensorflow counterpart (e.g. if to tf.cond).

See here for more on autograph, it's very easy to follow.

If you want to incorporate above initializer changes into your model, you have to create appropriate object and that's it.

... # Previous of code Model here
self.network = tf.keras.Sequential(
    [
        YourDense(100, bias=False, kernel_initializer="lecun_uniform"),
        tf.keras.layers.ReLU(),
        YourDense(10, bias_initializer=tf.initializers.Ones()),
    ]
)
... # and the same afterwards

With built-in tf.keras.layers.Dense layers, one can do the same (arguments names differ, but idea holds).

3.2 Automatic Differentiation using tf.GradientTape

3.2.1 Intro

Point of tf.GradientTape is to allow users normal Python control flow and gradient calculation of variables with respect to another variable.

Example taken from here but broken into separate pieces:

def f(x, y):
  output = 1.0
  for i in range(y):
    if i > 1 and i < 5:
      output = tf.multiply(output, x)
  return output

Regular python function with for and if flow control statements

def grad(x, y):
  with tf.GradientTape() as t:
    t.watch(x)
    out = f(x, y)
  return t.gradient(out, x)

Using gradient tape you can record all operations on Tensors (and their intermediate states as well) and "play" it backwards (perform automatic backward differentiation using chaing rule).

Every Tensor within tf.GradientTape() context manager is recorded automatically. If some Tensor is out of scope, use watch() method as one can see above.

Finally, gradient of output with respect to x (input is returned).

3.2.2 Connection with deep learning

What was described above is backpropagation algorithm. Gradients w.r.t (with respect to) outputs are calculated for each node in the network (or rather for every layer). Those gradients are then used by various optimizers to make corrections and so it repeats.

Let's continue and assume you have your tf.keras.Model, optimizer instance, tf.data.Dataset and loss function already set up.

One can define a Trainer class which will perform training for us. Please read comments in the code if in doubt:

class Trainer:
    def __init__(self, model, optimizer, loss_function):
        self.model = model
        self.loss_function = loss_function
        self.optimizer = optimizer
        # You could pass custom metrics in constructor
        # and adjust train_step and test_step accordingly
        self.train_loss = tf.keras.metrics.Mean(name="train_loss")
        self.test_loss = tf.keras.metrics.Mean(name="train_loss")

    def train_step(self, x, y):
        # Setup tape
        with tf.GradientTape() as tape:
            # Get current predictions of network
            y_pred = self.model(x)
            # Calculate loss generated by predictions
            loss = self.loss_function(y, y_pred)
        # Get gradients of loss w.r.t. EVERY trainable variable (iterable returned)
        gradients = tape.gradient(loss, self.model.trainable_variables)
        # Change trainable variable values according to gradient by applying optimizer policy
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))
        # Record loss of current step
        self.train_loss(loss)

    def train(self, dataset):
        # For N epochs iterate over dataset and perform train steps each time
        for x, y in dataset:
            self.train_step(x, y)

    def test_step(self, x, y):
        # Record test loss separately
        self.test_loss(self.loss_function(y, self.model(x)))

    def test(self, dataset):
        # Iterate over whole dataset
        for x, y in dataset:
            self.test_step(x, y)

    def __str__(self):
        # You need Python 3.7 with f-string support
        # Just return metrics
        return f"Loss: {self.train_loss.result()}, Test Loss: {self.test_loss.result()}"

Now, you could use this class in your code really simply like this:

EPOCHS = 5

# model, optimizer, loss defined beforehand
trainer = Trainer(model, optimizer, loss)
for _ in range(EPOCHS):
    trainer.train(train_dataset) # Same for training and test datasets
    trainer.test(test_dataset)
    print(f"Epoch {epoch}: {trainer})")

Print would tell you training and test loss for each epoch. You can mix training and testing any way you want (e.g. 5 epochs for training and 1 testing), you could add different metrics etc.

See here if you want non-OOP oriented approach (IMO less readable, but to each it's own).

这篇关于使用Tensorflow 2.0在MNIST上的自定义神经网络实现?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆