Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Impostare
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
introduzione
Keras offre corsi di formazione di default e cicli di valutazione, fit()
e evaluate()
. Il loro uso è coperto nella guida Formazione e valutazione con i metodi built-in .
Se si desidera personalizzare l'algoritmo di apprendimento del modello, pur sfruttando la comodità di fit()
(per esempio, quello di formare un GAN con fit()
), è possibile creare una sottoclasse del Model
di classe e implementare il proprio train_step()
metodo, che si chiama ripetutamente durante fit()
. Questo è trattato nel manuale Personalizzazione quanto accade in fit()
.
Ora, se desideri un controllo di livello molto basso su formazione e valutazione, dovresti scrivere i tuoi cicli di formazione e valutazione da zero. Questo è l'argomento di questa guida.
Utilizzando il GradientTape
: un primo esempio end-to-end
Chiamare un modello all'interno di un GradientTape
ambito vi permette di recuperare i gradienti dei pesi addestrabili dello strato rispetto ad un valore di perdita. L'utilizzo di un esempio di ottimizzazione, è possibile utilizzare questi gradienti di aggiornare queste variabili (che è possibile recuperare utilizzando model.trainable_weights
).
Consideriamo un semplice modello MNIST:
inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)
Alleniamolo utilizzando un gradiente mini-batch con un ciclo di allenamento personalizzato.
Innanzitutto, avremo bisogno di un ottimizzatore, una funzione di perdita e un set di dati:
# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 1s 0us/step 11501568/11490434 [==============================] - 1s 0us/step
Ecco il nostro ciclo di allenamento:
- Apriamo una
for
ciclo che itera su epoche - Per ogni epoca, si apre una
for
ciclo che itera il set di dati, in lotti - Per ciascun lotto, si apre una
GradientTape()
portata - All'interno di questo ambito chiamiamo il modello (forward pass) e calcoliamo la perdita
- Al di fuori dell'ambito, recuperiamo i gradienti dei pesi del modello rispetto alla perdita
- Infine, utilizziamo l'ottimizzatore per aggiornare i pesi del modello in base ai gradienti
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
# Open a GradientTape to record the operations run
# during the forward pass, which enables auto-differentiation.
with tf.GradientTape() as tape:
# Run the forward pass of the layer.
# The operations that the layer applies
# to its inputs are going to be recorded
# on the GradientTape.
logits = model(x_batch_train, training=True) # Logits for this minibatch
# Compute the loss value for this minibatch.
loss_value = loss_fn(y_batch_train, logits)
# Use the gradient tape to automatically retrieve
# the gradients of the trainable variables with respect to the loss.
grads = tape.gradient(loss_value, model.trainable_weights)
# Run one step of gradient descent by updating
# the value of the variables to minimize the loss.
optimizer.apply_gradients(zip(grads, model.trainable_weights))
# Log every 200 batches.
if step % 200 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
)
print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0 Training loss (for one batch) at step 0: 68.7478 Seen so far: 64 samples Training loss (for one batch) at step 200: 1.9448 Seen so far: 12864 samples Training loss (for one batch) at step 400: 1.1859 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.6914 Seen so far: 38464 samples Start of epoch 1 Training loss (for one batch) at step 0: 0.9113 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.9550 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.5139 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.7227 Seen so far: 38464 samples
Gestione di basso livello delle metriche
Aggiungiamo il monitoraggio delle metriche a questo ciclo di base.
Puoi facilmente riutilizzare le metriche integrate (o quelle personalizzate che hai scritto) in tali cicli di allenamento scritti da zero. Ecco il flusso:
- Istanzia la metrica all'inizio del ciclo
- Chiamare
metric.update_state()
dopo ogni lotto - Chiamare
metric.result()
quando è necessario per visualizzare il valore corrente della metrica - Chiamare
metric.reset_states()
quando è necessario cancellare lo stato della metrica (in genere alla fine di un'epoca)
Usiamo questa conoscenza per calcolare SparseCategoricalAccuracy
sui dati di convalida al termine di ogni epoca:
# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
Ecco il nostro ciclo di formazione e valutazione:
import time
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
start_time = time.time()
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
with tf.GradientTape() as tape:
logits = model(x_batch_train, training=True)
loss_value = loss_fn(y_batch_train, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
# Update training metric.
train_acc_metric.update_state(y_batch_train, logits)
# Log every 200 batches.
if step % 200 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
)
print("Seen so far: %d samples" % ((step + 1) * batch_size))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
# Reset training metrics at the end of each epoch
train_acc_metric.reset_states()
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
val_logits = model(x_batch_val, training=False)
# Update val metrics
val_acc_metric.update_state(y_batch_val, val_logits)
val_acc = val_acc_metric.result()
val_acc_metric.reset_states()
print("Validation acc: %.4f" % (float(val_acc),))
print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0 Training loss (for one batch) at step 0: 88.9958 Seen so far: 64 samples Training loss (for one batch) at step 200: 2.2214 Seen so far: 12864 samples Training loss (for one batch) at step 400: 1.3083 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.8282 Seen so far: 38464 samples Training acc over epoch: 0.7406 Validation acc: 0.8201 Time taken: 6.31s Start of epoch 1 Training loss (for one batch) at step 0: 0.3276 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.4819 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.5971 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.5862 Seen so far: 38464 samples Training acc over epoch: 0.8474 Validation acc: 0.8676 Time taken: 5.98s
Accelerazione vostro passo allenamento con tf.function
Il runtime predefinito in tensorflow 2 è esecuzione ansioso . Pertanto, il nostro ciclo di allenamento sopra viene eseguito con entusiasmo.
Questo è ottimo per il debug, ma la compilazione del grafico ha un netto vantaggio in termini di prestazioni. La descrizione del calcolo come un grafico statico consente al framework di applicare ottimizzazioni delle prestazioni globali. Ciò è impossibile quando il framework è costretto a eseguire avidamente un'operazione dopo l'altra, senza sapere cosa verrà dopo.
Puoi compilare in un grafico statico qualsiasi funzione che accetta tensori come input. Basta aggiungere un @tf.function
decoratore su di esso, in questo modo:
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
Facciamo lo stesso con la fase di valutazione:
@tf.function
def test_step(x, y):
val_logits = model(x, training=False)
val_acc_metric.update_state(y, val_logits)
Ora, ripetiamo il nostro ciclo di addestramento con questo passaggio di addestramento compilato:
import time
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
start_time = time.time()
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
loss_value = train_step(x_batch_train, y_batch_train)
# Log every 200 batches.
if step % 200 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
)
print("Seen so far: %d samples" % ((step + 1) * batch_size))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
# Reset training metrics at the end of each epoch
train_acc_metric.reset_states()
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
test_step(x_batch_val, y_batch_val)
val_acc = val_acc_metric.result()
val_acc_metric.reset_states()
print("Validation acc: %.4f" % (float(val_acc),))
print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0 Training loss (for one batch) at step 0: 0.7921 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.7755 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.1564 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.3181 Seen so far: 38464 samples Training acc over epoch: 0.8788 Validation acc: 0.8866 Time taken: 1.59s Start of epoch 1 Training loss (for one batch) at step 0: 0.5222 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.4574 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.4035 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.7561 Seen so far: 38464 samples Training acc over epoch: 0.8959 Validation acc: 0.9028 Time taken: 1.27s
Molto più veloce, no?
Gestione di basso livello delle perdite tracciate dal modello
Livelli e modelli ricorsivamente monitorare eventuali perdite creati durante il passo in avanti da strati quella chiamata self.add_loss(value)
. L'elenco risultante dei valori di perdita scalari sono disponibili tramite la proprietà model.losses
alla fine del passo in avanti.
Se vuoi utilizzare questi componenti di perdita, dovresti sommarli e aggiungerli alla perdita principale nella tua fase di allenamento.
Considera questo livello, che crea una perdita di regolarizzazione dell'attività:
class ActivityRegularizationLayer(layers.Layer):
def call(self, inputs):
self.add_loss(1e-2 * tf.reduce_sum(inputs))
return inputs
Costruiamo un modello davvero semplice che lo utilizzi:
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)
Ecco come dovrebbe essere il nostro passaggio di formazione ora:
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
# Add any extra losses created during the forward pass.
loss_value += sum(model.losses)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
Riepilogo
Ora sai tutto quello che c'è da sapere sull'utilizzo di cicli di allenamento integrati e sulla scrittura da zero.
Per concludere, ecco un semplice esempio end-to-end che collega tutto ciò che hai imparato in questa guida: un DCGAN addestrato sulle cifre MNIST.
Esempio end-to-end: un ciclo di formazione GAN da zero
Potresti avere familiarità con i GAN (Generative Adversarial Network). I GAN possono generare nuove immagini che sembrano quasi reali, imparando la distribuzione latente di un set di dati di addestramento di immagini (lo "spazio latente" delle immagini).
Un GAN è composto da due parti: un modello "generatore" che mappa i punti nello spazio latente in punti nello spazio immagine, un modello "discriminatore", un classificatore che può distinguere tra immagini reali (dal set di dati di addestramento) e false immagini (l'output della rete del generatore).
Un ciclo di allenamento GAN ha il seguente aspetto:
1) Formare il discriminatore. - Campiona un batch di punti casuali nello spazio latente. - Trasforma i punti in immagini false tramite il modello "generatore". - Ottieni una serie di immagini reali e combinale con le immagini generate. - Addestrare il modello "discriminatore" per classificare le immagini generate rispetto a quelle reali.
2) Addestrare il generatore. - Campiona punti casuali nello spazio latente. - Trasforma i punti in immagini false tramite la rete "generatore". - Ottieni una serie di immagini reali e combinale con le immagini generate. - Addestrare il modello "generatore" per "ingannare" il discriminatore e classificare le immagini false come reali.
Per una panoramica più dettagliata di come funziona Gans, vedere Deep Learning con Python .
Implementiamo questo ciclo di formazione. Innanzitutto, crea il discriminatore destinato a classificare le cifre false rispetto a quelle reali:
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
discriminator.summary()
Model: "discriminator" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d (Conv2D) (None, 14, 14, 64) 640 _________________________________________________________________ leaky_re_lu (LeakyReLU) (None, 14, 14, 64) 0 _________________________________________________________________ conv2d_1 (Conv2D) (None, 7, 7, 128) 73856 _________________________________________________________________ leaky_re_lu_1 (LeakyReLU) (None, 7, 7, 128) 0 _________________________________________________________________ global_max_pooling2d (Global (None, 128) 0 _________________________________________________________________ dense_4 (Dense) (None, 1) 129 ================================================================= Total params: 74,625 Trainable params: 74,625 Non-trainable params: 0 _________________________________________________________________
Poi creiamo una rete generatore, che trasforma vettori latenti in uscite di forma (28, 28, 1)
(rappresentante cifre MNIST):
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
Ecco il pezzo chiave: il ciclo di allenamento. Come puoi vedere è abbastanza semplice. La funzione della fase di allenamento richiede solo 17 righe.
# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)
# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
@tf.function
def train_step(real_images):
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
# Decode them to fake images
generated_images = generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(labels.shape)
# Train the discriminator
with tf.GradientTape() as tape:
predictions = discriminator(combined_images)
d_loss = loss_fn(labels, predictions)
grads = tape.gradient(d_loss, discriminator.trainable_weights)
d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = discriminator(generator(random_latent_vectors))
g_loss = loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, generator.trainable_weights)
g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
return d_loss, g_loss, generated_images
Alleniamoci nostra GAN, ripetutamente chiamando train_step
su gruppi di immagini.
Poiché il nostro discriminatore e generatore sono convnet, ti consigliamo di eseguire questo codice su una GPU.
import os
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
epochs = 1 # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"
for epoch in range(epochs):
print("\nStart epoch", epoch)
for step, real_images in enumerate(dataset):
# Train the discriminator & generator on one batch of real images.
d_loss, g_loss, generated_images = train_step(real_images)
# Logging.
if step % 200 == 0:
# Print metrics
print("discriminator loss at step %d: %.2f" % (step, d_loss))
print("adversarial loss at step %d: %.2f" % (step, g_loss))
# Save one generated image
img = tf.keras.preprocessing.image.array_to_img(
generated_images[0] * 255.0, scale=False
)
img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))
# To limit execution time we stop after 10 steps.
# Remove the lines below to actually train the model!
if step > 10:
break
Start epoch 0 discriminator loss at step 0: 0.69 adversarial loss at step 0: 0.69
Questo è tutto! Otterrai cifre MNIST false di bell'aspetto dopo soli ~ 30 secondi di allenamento sulla GPU Colab.