Personalizza ciò che accade in Model.fit

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

introduzione

Quando si sta facendo apprendimento supervisionato, è possibile utilizzare fit() e tutto funziona senza intoppi.

Quando è necessario scrivere il proprio ciclo di formazione da zero, è possibile utilizzare il GradientTape e prendere il controllo di ogni piccolo dettaglio.

Ma cosa succede se avete bisogno di un algoritmo di training personalizzato, ma si desidera comunque beneficiare delle pratiche funzioni di fit() , come callback, il supporto integrato di distribuzione, o fase di fusione?

Un principio chiave di Keras è progressiva divulgazione di complessità. Dovresti sempre essere in grado di entrare nei flussi di lavoro di livello inferiore in modo graduale. Non dovresti cadere da un dirupo se la funzionalità di alto livello non corrisponde esattamente al tuo caso d'uso. Dovresti essere in grado di ottenere un maggiore controllo sui piccoli dettagli pur mantenendo una quantità proporzionata di praticità di alto livello.

Quando hai bisogno di personalizzare ciò che fit() lo fa, si dovrebbe ignorare la funzione fase di formazione del Model di classe. Questa è la funzione che viene chiamato dal fit() per ogni serie di dati. Sarà quindi in grado di chiamare fit() come al solito - e sarà in esecuzione il proprio algoritmo di apprendimento.

Tieni presente che questo modello non ti impedisce di creare modelli con l'API funzionale. È possibile farlo se si sta costruendo Sequential modelli, modelli funzionali API, o modelli sottoclasse.

Vediamo come funziona.

Impostare

Richiede TensorFlow 2.2 o successivo.

import tensorflow as tf
from tensorflow import keras

Un primo semplice esempio

Partiamo da un semplice esempio:

  • Creiamo una nuova classe che le sottoclassi keras.Model .
  • Abbiamo appena sovrascrivere il metodo train_step(self, data) .
  • Restituiamo un dizionario che mappa i nomi delle metriche (inclusa la perdita) al loro valore corrente.

L'argomento di input data è ciò che viene passato per adattarsi come dati di allenamento:

  • Se si passa le matrici NumPy, chiamando fit(x, y, ...) , allora data saranno la tupla (x, y)
  • Se si passa un tf.data.Dataset , chiamando fit(dataset, ...) , allora data saranno ciò che viene prodotto dai dataset in ogni lotto.

Nel corpo del train_step metodo, implementiamo un aggiornamento allenamento regolare, simile a quello che si ha già familiarità con. È importante sottolineare che, calcoliamo la perdita tramite self.compiled_loss , che avvolge la funzione di perdita (es) (s) che sono stati passati alla compile() .

Analogamente, chiamiamo self.compiled_metrics.update_state(y, y_pred) per aggiornare lo stato dei parametri che sono stati passati in compile() , e di query risultati self.metrics alla fine di recuperare il loro valore corrente.

class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value
            # (the loss function is configured in `compile()`)
            loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        # Update metrics (includes the metric that tracks the loss)
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value
        return {m.name: m.result() for m in self.metrics}

Proviamo questo:

import numpy as np

# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3
32/32 [==============================] - 1s 2ms/step - loss: 0.9909 - mae: 0.8601
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5345
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.2906 - mae: 0.4311
<keras.callbacks.History at 0x7f5ad1ca1090>

Andare di livello inferiore

Naturalmente, si può solo saltare passando una funzione di perdita in compile() , e invece fare tutto manualmente in train_step . Allo stesso modo per le metriche.

Ecco un esempio di livello inferiore, che utilizza solo compile() per configurare l'ottimizzatore:

  • Iniziamo con la creazione di Metric casi per monitorare la nostra perdita e un punteggio MAE.
  • Noi attuare una consuetudine train_step() che aggiorna lo stato di queste metriche (chiamando update_state() su di loro), poi le query (tramite result() ) per restituire il valore medio della corrente, da visualizzare sulla barra di avanzamento e di essere passare a qualsiasi richiamata.
  • Nota che avremmo avuto bisogno di chiamare reset_states() sulle nostre metriche tra ogni epoca! In caso contrario, chiamando result() restituirebbe una media dall'inizio della formazione, mentre noi di solito lavoro con le medie per-epoca. Fortunatamente, il quadro può farlo per noi: basta elencare qualsiasi metrica che si desidera ripristinare nel metrics di proprietà del modello. Il modello chiamerà reset_states() su qualsiasi oggetto elencato qui all'inizio di ogni fit() epoca o all'inizio di una chiamata a evaluate() .
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")


class CustomModel(keras.Model):
    def train_step(self, data):
        x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute our own loss
            loss = keras.losses.mean_squared_error(y, y_pred)

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Compute our own metrics
        loss_tracker.update_state(loss)
        mae_metric.update_state(y, y_pred)
        return {"loss": loss_tracker.result(), "mae": mae_metric.result()}

    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [loss_tracker, mae_metric]


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)

# We don't passs a loss or metrics here.
model.compile(optimizer="adam")

# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5
32/32 [==============================] - 0s 1ms/step - loss: 1.5969 - mae: 1.1523
Epoch 2/5
32/32 [==============================] - 0s 1ms/step - loss: 0.7352 - mae: 0.7310
Epoch 3/5
32/32 [==============================] - 0s 1ms/step - loss: 0.3830 - mae: 0.4999
Epoch 4/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2809 - mae: 0.4215
Epoch 5/5
32/32 [==============================] - 0s 1ms/step - loss: 0.2590 - mae: 0.4058
<keras.callbacks.History at 0x7f5ad1b62c50>

Sostenere sample_weight & class_weight

Potresti aver notato che il nostro primo esempio di base non menzionava la ponderazione del campione. Se si vuole sostenere l' fit() argomenti sample_weight e class_weight , devi semplicemente effettuare le seguenti operazioni:

  • Disimballare sample_weight dalla data argomento
  • Passarlo a compiled_loss & compiled_metrics (naturalmente, si potrebbe anche solo applicarlo manualmente se non si basano su compile() per le perdite e metriche)
  • Questo è tutto. Questa è la lista.
class CustomModel(keras.Model):
    def train_step(self, data):
        # Unpack the data. Its structure depends on your model and
        # on what you pass to `fit()`.
        if len(data) == 3:
            x, y, sample_weight = data
        else:
            sample_weight = None
            x, y = data

        with tf.GradientTape() as tape:
            y_pred = self(x, training=True)  # Forward pass
            # Compute the loss value.
            # The loss function is configured in `compile()`.
            loss = self.compiled_loss(
                y,
                y_pred,
                sample_weight=sample_weight,
                regularization_losses=self.losses,
            )

        # Compute gradients
        trainable_vars = self.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics.
        # Metrics are configured in `compile()`.
        self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)

        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])

# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1365 - mae: 0.4196
Epoch 2/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1285 - mae: 0.4068
Epoch 3/3
32/32 [==============================] - 0s 2ms/step - loss: 0.1212 - mae: 0.3971
<keras.callbacks.History at 0x7f5ad1ba64d0>

Fornire il proprio passaggio di valutazione

Che cosa succede se si vuole fare lo stesso per le chiamate a model.evaluate() ? Poi si sarebbe ignorare test_step esattamente nello stesso modo. Ecco come appare:

class CustomModel(keras.Model):
    def test_step(self, data):
        # Unpack the data
        x, y = data
        # Compute predictions
        y_pred = self(x, training=False)
        # Updates the metrics tracking the loss
        self.compiled_loss(y, y_pred, regularization_losses=self.losses)
        # Update the metrics.
        self.compiled_metrics.update_state(y, y_pred)
        # Return a dict mapping metric names to current value.
        # Note that it will include the loss (tracked in self.metrics).
        return {m.name: m.result() for m in self.metrics}


# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])

# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 2.7584 - mae: 1.5920
[2.758362054824829, 1.59201979637146]

Conclusione: un esempio GAN end-to-end

Esaminiamo un esempio end-to-end che sfrutta tutto ciò che hai appena appreso.

Consideriamo:

  • Una rete di generatori pensata per generare immagini 28x28x1.
  • Una rete discriminatoria destinata a classificare le immagini 28x28x1 in due classi ("false" e "reali").
  • Un ottimizzatore per ciascuno.
  • Una funzione di perdita per addestrare il discriminatore.
from tensorflow.keras import layers

# Create the discriminator
discriminator = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.GlobalMaxPooling2D(),
        layers.Dense(1),
    ],
    name="discriminator",
)

# Create the generator
latent_dim = 128
generator = keras.Sequential(
    [
        keras.Input(shape=(latent_dim,)),
        # We want to generate 128 coefficients to reshape into a 7x7x128 map
        layers.Dense(7 * 7 * 128),
        layers.LeakyReLU(alpha=0.2),
        layers.Reshape((7, 7, 128)),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
    ],
    name="generator",
)

Ecco una classe GAN feature-complete, ignorando compile() per utilizzare la propria firma, e l'attuazione l'intero algoritmo GAN in 17 righe in train_step :

class GAN(keras.Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.zeros((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            predictions = self.discriminator(self.generator(random_latent_vectors))
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
        return {"d_loss": d_loss, "g_loss": g_loss}

Proviamolo:

# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)

gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
    loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)

# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 3s 11ms/step - d_loss: 0.4031 - g_loss: 0.9305
<keras.callbacks.History at 0x7f5ad1b37c50>

Le idee alla base del deep learning sono semplici, quindi perché la loro implementazione dovrebbe essere dolorosa?