Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
introduzione
Quando si sta facendo apprendimento supervisionato, è possibile utilizzare fit()
e tutto funziona senza intoppi.
Quando è necessario scrivere il proprio ciclo di formazione da zero, è possibile utilizzare il GradientTape
e prendere il controllo di ogni piccolo dettaglio.
Ma cosa succede se avete bisogno di un algoritmo di training personalizzato, ma si desidera comunque beneficiare delle pratiche funzioni di fit()
, come callback, il supporto integrato di distribuzione, o fase di fusione?
Un principio chiave di Keras è progressiva divulgazione di complessità. Dovresti sempre essere in grado di entrare nei flussi di lavoro di livello inferiore in modo graduale. Non dovresti cadere da un dirupo se la funzionalità di alto livello non corrisponde esattamente al tuo caso d'uso. Dovresti essere in grado di ottenere un maggiore controllo sui piccoli dettagli pur mantenendo una quantità proporzionata di praticità di alto livello.
Quando hai bisogno di personalizzare ciò che fit()
lo fa, si dovrebbe ignorare la funzione fase di formazione del Model
di classe. Questa è la funzione che viene chiamato dal fit()
per ogni serie di dati. Sarà quindi in grado di chiamare fit()
come al solito - e sarà in esecuzione il proprio algoritmo di apprendimento.
Tieni presente che questo modello non ti impedisce di creare modelli con l'API funzionale. È possibile farlo se si sta costruendo Sequential
modelli, modelli funzionali API, o modelli sottoclasse.
Vediamo come funziona.
Impostare
Richiede TensorFlow 2.2 o successivo.
import tensorflow as tf
from tensorflow import keras
Un primo semplice esempio
Partiamo da un semplice esempio:
- Creiamo una nuova classe che le sottoclassi
keras.Model
. - Abbiamo appena sovrascrivere il metodo
train_step(self, data)
. - Restituiamo un dizionario che mappa i nomi delle metriche (inclusa la perdita) al loro valore corrente.
L'argomento di input data
è ciò che viene passato per adattarsi come dati di allenamento:
- Se si passa le matrici NumPy, chiamando
fit(x, y, ...)
, alloradata
saranno la tupla(x, y)
- Se si passa un
tf.data.Dataset
, chiamandofit(dataset, ...)
, alloradata
saranno ciò che viene prodotto daidataset
in ogni lotto.
Nel corpo del train_step
metodo, implementiamo un aggiornamento allenamento regolare, simile a quello che si ha già familiarità con. È importante sottolineare che, calcoliamo la perdita tramite self.compiled_loss
, che avvolge la funzione di perdita (es) (s) che sono stati passati alla compile()
.
Analogamente, chiamiamo self.compiled_metrics.update_state(y, y_pred)
per aggiornare lo stato dei parametri che sono stati passati in compile()
, e di query risultati self.metrics
alla fine di recuperare il loro valore corrente.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
Proviamo questo:
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.9909 - mae: 0.8601 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5345 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.2906 - mae: 0.4311 <keras.callbacks.History at 0x7f5ad1ca1090>
Andare di livello inferiore
Naturalmente, si può solo saltare passando una funzione di perdita in compile()
, e invece fare tutto manualmente in train_step
. Allo stesso modo per le metriche.
Ecco un esempio di livello inferiore, che utilizza solo compile()
per configurare l'ottimizzatore:
- Iniziamo con la creazione di
Metric
casi per monitorare la nostra perdita e un punteggio MAE. - Noi attuare una consuetudine
train_step()
che aggiorna lo stato di queste metriche (chiamandoupdate_state()
su di loro), poi le query (tramiteresult()
) per restituire il valore medio della corrente, da visualizzare sulla barra di avanzamento e di essere passare a qualsiasi richiamata. - Nota che avremmo avuto bisogno di chiamare
reset_states()
sulle nostre metriche tra ogni epoca! In caso contrario, chiamandoresult()
restituirebbe una media dall'inizio della formazione, mentre noi di solito lavoro con le medie per-epoca. Fortunatamente, il quadro può farlo per noi: basta elencare qualsiasi metrica che si desidera ripristinare nelmetrics
di proprietà del modello. Il modello chiameràreset_states()
su qualsiasi oggetto elencato qui all'inizio di ognifit()
epoca o all'inizio di una chiamata aevaluate()
.
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
class CustomModel(keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
loss_tracker.update_state(loss)
mae_metric.update_state(y, y_pred)
return {"loss": loss_tracker.result(), "mae": mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [loss_tracker, mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't passs a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5 32/32 [==============================] - 0s 1ms/step - loss: 1.5969 - mae: 1.1523 Epoch 2/5 32/32 [==============================] - 0s 1ms/step - loss: 0.7352 - mae: 0.7310 Epoch 3/5 32/32 [==============================] - 0s 1ms/step - loss: 0.3830 - mae: 0.4999 Epoch 4/5 32/32 [==============================] - 0s 1ms/step - loss: 0.2809 - mae: 0.4215 Epoch 5/5 32/32 [==============================] - 0s 1ms/step - loss: 0.2590 - mae: 0.4058 <keras.callbacks.History at 0x7f5ad1b62c50>
Sostenere sample_weight
& class_weight
Potresti aver notato che il nostro primo esempio di base non menzionava la ponderazione del campione. Se si vuole sostenere l' fit()
argomenti sample_weight
e class_weight
, devi semplicemente effettuare le seguenti operazioni:
- Disimballare
sample_weight
dalladata
argomento - Passarlo a
compiled_loss
&compiled_metrics
(naturalmente, si potrebbe anche solo applicarlo manualmente se non si basano sucompile()
per le perdite e metriche) - Questo è tutto. Questa è la lista.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1365 - mae: 0.4196 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1285 - mae: 0.4068 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1212 - mae: 0.3971 <keras.callbacks.History at 0x7f5ad1ba64d0>
Fornire il proprio passaggio di valutazione
Che cosa succede se si vuole fare lo stesso per le chiamate a model.evaluate()
? Poi si sarebbe ignorare test_step
esattamente nello stesso modo. Ecco come appare:
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 2.7584 - mae: 1.5920 [2.758362054824829, 1.59201979637146]
Conclusione: un esempio GAN end-to-end
Esaminiamo un esempio end-to-end che sfrutta tutto ciò che hai appena appreso.
Consideriamo:
- Una rete di generatori pensata per generare immagini 28x28x1.
- Una rete discriminatoria destinata a classificare le immagini 28x28x1 in due classi ("false" e "reali").
- Un ottimizzatore per ciascuno.
- Una funzione di perdita per addestrare il discriminatore.
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
Ecco una classe GAN feature-complete, ignorando compile()
per utilizzare la propria firma, e l'attuazione l'intero algoritmo GAN in 17 righe in train_step
:
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return {"d_loss": d_loss, "g_loss": g_loss}
Proviamolo:
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 3s 11ms/step - d_loss: 0.4031 - g_loss: 0.9305 <keras.callbacks.History at 0x7f5ad1b37c50>
Le idee alla base del deep learning sono semplici, quindi perché la loro implementazione dovrebbe essere dolorosa?