Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar cuaderno |
Introducción
Cuando estás haciendo el aprendizaje supervisado, se puede utilizar fit()
y todo funciona sin problemas.
Cuando se necesita para escribir su propio bucle de formación a partir de cero, se puede utilizar el GradientTape
y tomar el control de todos los detalles.
Pero lo que si usted necesita un algoritmo de entrenamiento a medida, pero todavía quiere beneficiarse de las características convenientes de fit()
, tales como las devoluciones de llamada, una función de apoyo a la distribución, o la fusión paso?
Un principio básico de Keras es la revelación progresiva de la complejidad. Siempre debe poder ingresar a flujos de trabajo de nivel inferior de manera gradual. No debe caer por un precipicio si la funcionalidad de alto nivel no coincide exactamente con su caso de uso. Debería poder obtener más control sobre los pequeños detalles mientras conserva una cantidad proporcional de conveniencia de alto nivel.
Cuando tenga que personalizar lo que fit()
lleva, deberá anular la función de paso de formación del Model
de clases. Esta es la función que se llama por fit()
para cada lote de datos. A continuación, será capaz de llamada de fit()
, como de costumbre - y se va a ejecutar su propio algoritmo de aprendizaje.
Tenga en cuenta que este patrón no le impide crear modelos con la API funcional. Usted puede hacer esto si usted está construyendo Sequential
modelos, modelos funcionales de la API, o modelos subclases.
Veamos cómo funciona.
Configuración
Requiere TensorFlow 2.2 o posterior.
import tensorflow as tf
from tensorflow import keras
Un primer ejemplo sencillo
Comencemos con un ejemplo simple:
- Creamos una nueva clase que subclases
keras.Model
. - Acabamos de reemplazar el método
train_step(self, data)
. - Devolvemos los nombres de las métricas de asignación de un diccionario (incluida la pérdida) a su valor actual.
El argumento de entrada data
es lo que se pasa a encajar como datos de entrenamiento:
- Si pasa matrices numpy, llamando
fit(x, y, ...)
, a continuación,data
serán la tupla(x, y)
- Si pasa un
tf.data.Dataset
, llamandofit(dataset, ...)
, a continuación,data
serán lo que se produjo por eldataset
en cada lote.
En el cuerpo de la train_step
método, se implementa una actualización de la formación regular, similar a lo que ya está familiarizado con. Es importante destacar que, se calcula la pérdida a través de self.compiled_loss
, que envuelve la función de pérdida (s) (s) que se pasa al compile()
.
Del mismo modo, llamamos self.compiled_metrics.update_state(y, y_pred)
para actualizar el estado de los indicadores que se aprobaron en compile()
, y los resultados de la consulta self.metrics
al final recuperamos a su valor actual.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
Probemos esto:
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.9909 - mae: 0.8601 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.4363 - mae: 0.5345 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.2906 - mae: 0.4311 <keras.callbacks.History at 0x7f5ad1ca1090>
Yendo a un nivel inferior
Naturalmente, usted podría saltar pasando una función de pérdida de compile()
, y en lugar de hacer todo manualmente en train_step
. Lo mismo ocurre con las métricas.
He aquí un ejemplo de nivel inferior, que sólo utiliza compile()
para configurar el optimizador:
- Comenzamos creando
Metric
instancias para realizar un seguimiento de nuestra pérdida y una puntuación MAE. - Ponemos en práctica una costumbre
train_step()
que actualiza el estado de estas métricas (llamandoupdate_state()
sobre ellos), entonces se le pregunta ellos (a través deresult()
) para devolver su valor medio actual, que se muestra por la barra de progreso y para ser pasar a cualquier devolución de llamada. - Tenga en cuenta que tendríamos que llamar
reset_states()
en nuestras métricas entre cada época! De lo contrario llamarresult()
devolvería un promedio desde el inicio de la formación, mientras que por lo general trabajan con promedios por cada época. Afortunadamente, el marco puede hacer eso por nosotros: simplemente indica alguna métrica que desea restablecer en elmetrics
propiedad del modelo. El modelo llamaráreset_states()
en cualquier objeto que aparece aquí en el comienzo de cadafit()
época o al comienzo de una llamada aevaluate()
.
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
class CustomModel(keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
loss_tracker.update_state(loss)
mae_metric.update_state(y, y_pred)
return {"loss": loss_tracker.result(), "mae": mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [loss_tracker, mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't passs a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5 32/32 [==============================] - 0s 1ms/step - loss: 1.5969 - mae: 1.1523 Epoch 2/5 32/32 [==============================] - 0s 1ms/step - loss: 0.7352 - mae: 0.7310 Epoch 3/5 32/32 [==============================] - 0s 1ms/step - loss: 0.3830 - mae: 0.4999 Epoch 4/5 32/32 [==============================] - 0s 1ms/step - loss: 0.2809 - mae: 0.4215 Epoch 5/5 32/32 [==============================] - 0s 1ms/step - loss: 0.2590 - mae: 0.4058 <keras.callbacks.History at 0x7f5ad1b62c50>
Apoyando sample_weight
y class_weight
Es posible que haya notado que nuestro primer ejemplo básico no mencionó la ponderación de la muestra. Si quieres apoyar el fit()
argumentos sample_weight
y class_weight
, usted sólo tiene que hacer lo siguiente:
- Desempaquetar
sample_weight
Deldata
argumento - Pasarlo a
compiled_loss
ycompiled_metrics
(por supuesto, usted podría también acaba de aplicar de forma manual si no se basan encompile()
por las pérdidas y métricas) - Eso es. Esa es la lista.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1365 - mae: 0.4196 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1285 - mae: 0.4068 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1212 - mae: 0.3971 <keras.callbacks.History at 0x7f5ad1ba64d0>
Proporcionar su propio paso de evaluación
¿Qué pasa si usted quiere hacer lo mismo para las llamadas a model.evaluate()
? De allí tendría que anular test_step
exactamente de la misma manera. Así es como se ve:
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 1ms/step - loss: 2.7584 - mae: 1.5920 [2.758362054824829, 1.59201979637146]
Conclusión: un ejemplo de GAN de un extremo a otro
Analicemos un ejemplo de principio a fin que aprovecha todo lo que acaba de aprender.
Consideremos:
- Una red de generadores destinada a generar imágenes de 28x28x1.
- Una red discriminadora destinada a clasificar imágenes de 28x28x1 en dos clases ("falsas" y "reales").
- Un optimizador para cada uno.
- Una función de pérdida para entrenar al discriminador.
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
Aquí está una clase GAN-función completa, haciendo caso omiso compile()
para usar su propia firma, y la aplicación de todo el algoritmo de GAN en 17 líneas en train_step
:
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return {"d_loss": d_loss, "g_loss": g_loss}
Probémoslo:
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 3s 11ms/step - d_loss: 0.4031 - g_loss: 0.9305 <keras.callbacks.History at 0x7f5ad1b37c50>
Las ideas detrás del aprendizaje profundo son simples, entonces, ¿por qué debería ser dolorosa su implementación?