Ver no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Configurar
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
Introdução
Keras oferece treinamento padrão e loops de avaliação, fit()
e evaluate()
. Seu uso é coberto na guia Treinamento e avaliação com os métodos embutidos .
Se você quiser personalizar o algoritmo de aprendizagem do seu modelo, enquanto ainda aproveitar a conveniência de fit()
(por exemplo, para treinar um GAN utilizando fit()
), você pode subclasse o Model
de classe e implementar seu próprio train_step()
método, que é chamado repetidamente durante fit()
. Isso é abordado na guia Personalizar o que acontece no fit()
.
Agora, se você deseja um controle de nível muito baixo sobre o treinamento e avaliação, deve escrever seus próprios loops de treinamento e avaliação do zero. É disso que trata este guia.
Usando o GradientTape
: um primeiro exemplo de ponta-a-ponta
Chamar um modelo dentro de um GradientTape
âmbito permite recuperar os gradientes dos pesos treináveis da camada em relação a um valor de perda. Usando uma instância otimizador, você pode usar esses gradientes de atualizar essas variáveis (que você pode recuperar usando model.trainable_weights
).
Vamos considerar um modelo MNIST simples:
inputs = keras.Input(shape=(784,), name="digits")
x1 = layers.Dense(64, activation="relu")(inputs)
x2 = layers.Dense(64, activation="relu")(x1)
outputs = layers.Dense(10, name="predictions")(x2)
model = keras.Model(inputs=inputs, outputs=outputs)
Vamos treiná-lo usando gradiente de minilote com um loop de treinamento personalizado.
Primeiro, vamos precisar de um otimizador, uma função de perda e um conjunto de dados:
# Instantiate an optimizer.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Prepare the training dataset.
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
x_train = np.reshape(x_train, (-1, 784))
x_test = np.reshape(x_test, (-1, 784))
# Reserve 10,000 samples for validation.
x_val = x_train[-10000:]
y_val = y_train[-10000:]
x_train = x_train[:-10000]
y_train = y_train[:-10000]
# Prepare the training dataset.
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
# Prepare the validation dataset.
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
val_dataset = val_dataset.batch(batch_size)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 1s 0us/step 11501568/11490434 [==============================] - 1s 0us/step
Aqui está nosso loop de treinamento:
- Abrimos um
for
loop que itera sobre épocas - Para cada época, abrimos uma
for
loop que itera sobre o conjunto de dados, em lotes - Para cada lote, abrimos um
GradientTape()
escopo - Dentro deste escopo, chamamos o modelo (passagem para frente) e calculamos a perda
- Fora do escopo, recuperamos os gradientes dos pesos do modelo em relação à perda
- Finalmente, usamos o otimizador para atualizar os pesos do modelo com base nos gradientes
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
# Open a GradientTape to record the operations run
# during the forward pass, which enables auto-differentiation.
with tf.GradientTape() as tape:
# Run the forward pass of the layer.
# The operations that the layer applies
# to its inputs are going to be recorded
# on the GradientTape.
logits = model(x_batch_train, training=True) # Logits for this minibatch
# Compute the loss value for this minibatch.
loss_value = loss_fn(y_batch_train, logits)
# Use the gradient tape to automatically retrieve
# the gradients of the trainable variables with respect to the loss.
grads = tape.gradient(loss_value, model.trainable_weights)
# Run one step of gradient descent by updating
# the value of the variables to minimize the loss.
optimizer.apply_gradients(zip(grads, model.trainable_weights))
# Log every 200 batches.
if step % 200 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
)
print("Seen so far: %s samples" % ((step + 1) * batch_size))
Start of epoch 0 Training loss (for one batch) at step 0: 68.7478 Seen so far: 64 samples Training loss (for one batch) at step 200: 1.9448 Seen so far: 12864 samples Training loss (for one batch) at step 400: 1.1859 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.6914 Seen so far: 38464 samples Start of epoch 1 Training loss (for one batch) at step 0: 0.9113 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.9550 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.5139 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.7227 Seen so far: 38464 samples
Tratamento de baixo nível de métricas
Vamos adicionar monitoramento de métricas a este loop básico.
Você pode reutilizar prontamente as métricas integradas (ou personalizadas que você escreveu) em tais loops de treinamento escritos do zero. Este é o fluxo:
- Instancie a métrica no início do loop
- Chamada
metric.update_state()
após cada lote - Chamada
metric.result()
quando você precisa exibir o valor atual da métrica - Chamada
metric.reset_states()
quando você precisa limpar o estado da métrica (normalmente no final de uma época)
Vamos usar esse conhecimento para calcular SparseCategoricalAccuracy
em dados de validação, no final de cada época:
# Get model
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu", name="dense_1")(inputs)
x = layers.Dense(64, activation="relu", name="dense_2")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)
# Instantiate an optimizer to train the model.
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
# Instantiate a loss function.
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# Prepare the metrics.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc_metric = keras.metrics.SparseCategoricalAccuracy()
Este é o nosso ciclo de treinamento e avaliação:
import time
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
start_time = time.time()
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
with tf.GradientTape() as tape:
logits = model(x_batch_train, training=True)
loss_value = loss_fn(y_batch_train, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
# Update training metric.
train_acc_metric.update_state(y_batch_train, logits)
# Log every 200 batches.
if step % 200 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
)
print("Seen so far: %d samples" % ((step + 1) * batch_size))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
# Reset training metrics at the end of each epoch
train_acc_metric.reset_states()
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
val_logits = model(x_batch_val, training=False)
# Update val metrics
val_acc_metric.update_state(y_batch_val, val_logits)
val_acc = val_acc_metric.result()
val_acc_metric.reset_states()
print("Validation acc: %.4f" % (float(val_acc),))
print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0 Training loss (for one batch) at step 0: 88.9958 Seen so far: 64 samples Training loss (for one batch) at step 200: 2.2214 Seen so far: 12864 samples Training loss (for one batch) at step 400: 1.3083 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.8282 Seen so far: 38464 samples Training acc over epoch: 0.7406 Validation acc: 0.8201 Time taken: 6.31s Start of epoch 1 Training loss (for one batch) at step 0: 0.3276 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.4819 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.5971 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.5862 Seen so far: 38464 samples Training acc over epoch: 0.8474 Validation acc: 0.8676 Time taken: 5.98s
Aceleração sua etapa de treinamento com tf.function
O tempo de execução padrão no TensorFlow 2 é execução ansioso . Como tal, nosso loop de treinamento acima executa com entusiasmo.
Isso é ótimo para depuração, mas a compilação de gráfico tem uma vantagem de desempenho definitiva. Descrever seu cálculo como um gráfico estático permite que a estrutura aplique otimizações de desempenho global. Isso é impossível quando a estrutura é obrigada a executar avidamente uma operação após a outra, sem nenhum conhecimento do que vem a seguir.
Você pode compilar em um gráfico estático qualquer função que receba tensores como entrada. Basta adicionar um @tf.function
decorador sobre ele, como este:
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
Vamos fazer o mesmo com a etapa de avaliação:
@tf.function
def test_step(x, y):
val_logits = model(x, training=False)
val_acc_metric.update_state(y, val_logits)
Agora, vamos executar novamente nosso loop de treinamento com esta etapa de treinamento compilada:
import time
epochs = 2
for epoch in range(epochs):
print("\nStart of epoch %d" % (epoch,))
start_time = time.time()
# Iterate over the batches of the dataset.
for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
loss_value = train_step(x_batch_train, y_batch_train)
# Log every 200 batches.
if step % 200 == 0:
print(
"Training loss (for one batch) at step %d: %.4f"
% (step, float(loss_value))
)
print("Seen so far: %d samples" % ((step + 1) * batch_size))
# Display metrics at the end of each epoch.
train_acc = train_acc_metric.result()
print("Training acc over epoch: %.4f" % (float(train_acc),))
# Reset training metrics at the end of each epoch
train_acc_metric.reset_states()
# Run a validation loop at the end of each epoch.
for x_batch_val, y_batch_val in val_dataset:
test_step(x_batch_val, y_batch_val)
val_acc = val_acc_metric.result()
val_acc_metric.reset_states()
print("Validation acc: %.4f" % (float(val_acc),))
print("Time taken: %.2fs" % (time.time() - start_time))
Start of epoch 0 Training loss (for one batch) at step 0: 0.7921 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.7755 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.1564 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.3181 Seen so far: 38464 samples Training acc over epoch: 0.8788 Validation acc: 0.8866 Time taken: 1.59s Start of epoch 1 Training loss (for one batch) at step 0: 0.5222 Seen so far: 64 samples Training loss (for one batch) at step 200: 0.4574 Seen so far: 12864 samples Training loss (for one batch) at step 400: 0.4035 Seen so far: 25664 samples Training loss (for one batch) at step 600: 0.7561 Seen so far: 38464 samples Training acc over epoch: 0.8959 Validation acc: 0.9028 Time taken: 1.27s
Muito mais rápido, não é?
Tratamento de baixo nível de perdas rastreadas pelo modelo
Camadas e modelos recursivamente rastrear eventuais perdas criados durante o passe para frente por camadas essa chamada self.add_loss(value)
. A lista resultante dos valores de perda de escalares estão disponíveis através da propriedade model.losses
no final do passe para frente.
Se você quiser usar esses componentes de perda, deve somá-los e adicioná-los à perda principal em sua etapa de treinamento.
Considere esta camada, que cria uma perda de regularização de atividades:
class ActivityRegularizationLayer(layers.Layer):
def call(self, inputs):
self.add_loss(1e-2 * tf.reduce_sum(inputs))
return inputs
Vamos construir um modelo realmente simples que o use:
inputs = keras.Input(shape=(784,), name="digits")
x = layers.Dense(64, activation="relu")(inputs)
# Insert activity regularization as a layer
x = ActivityRegularizationLayer()(x)
x = layers.Dense(64, activation="relu")(x)
outputs = layers.Dense(10, name="predictions")(x)
model = keras.Model(inputs=inputs, outputs=outputs)
Esta é a aparência de nossa etapa de treinamento agora:
@tf.function
def train_step(x, y):
with tf.GradientTape() as tape:
logits = model(x, training=True)
loss_value = loss_fn(y, logits)
# Add any extra losses created during the forward pass.
loss_value += sum(model.losses)
grads = tape.gradient(loss_value, model.trainable_weights)
optimizer.apply_gradients(zip(grads, model.trainable_weights))
train_acc_metric.update_state(y, logits)
return loss_value
Resumo
Agora você sabe tudo o que há para saber sobre como usar loops de treinamento integrados e como escrever seus próprios do zero.
Para concluir, aqui está um exemplo simples de ponta a ponta que reúne tudo o que você aprendeu neste guia: um DCGAN treinado em dígitos MNIST.
Exemplo ponta a ponta: um ciclo de treinamento GAN do zero
Você deve estar familiarizado com Generative Adversarial Networks (GANs). Os GANs podem gerar novas imagens que parecem quase reais, aprendendo a distribuição latente de um conjunto de dados de treinamento de imagens (o "espaço latente" das imagens).
Um GAN é feito de duas partes: um modelo "gerador" que mapeia pontos no espaço latente para pontos no espaço da imagem, um modelo "discriminador", um classificador que pode dizer a diferença entre imagens reais (do conjunto de dados de treinamento) e falsas imagens (a saída da rede do gerador).
Um loop de treinamento GAN se parece com isto:
1) Treine o discriminador. - Faça uma amostra de um lote de pontos aleatórios no espaço latente. - Transforme os pontos em imagens falsas através do modelo "gerador". - Pegue um lote de imagens reais e combine-as com as imagens geradas. - Treine o modelo "discriminador" para classificar imagens geradas vs. reais.
2) Treine o gerador. - Amostra pontos aleatórios no espaço latente. - Transforme os pontos em imagens falsas através da rede "gerador". - Pegue um lote de imagens reais e combine-as com as imagens geradas. - Treine o modelo "gerador" para "enganar" o discriminador e classificar as imagens falsas como reais.
Para uma visão muito mais detalhada de como funciona o GAN, consulte profundo de aprendizagem com Python .
Vamos implementar este loop de treinamento. Primeiro, crie o discriminador destinado a classificar dígitos falsos vs. reais:
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
discriminator.summary()
Model: "discriminator" _________________________________________________________________ Layer (type) Output Shape Param # ================================================================= conv2d (Conv2D) (None, 14, 14, 64) 640 _________________________________________________________________ leaky_re_lu (LeakyReLU) (None, 14, 14, 64) 0 _________________________________________________________________ conv2d_1 (Conv2D) (None, 7, 7, 128) 73856 _________________________________________________________________ leaky_re_lu_1 (LeakyReLU) (None, 7, 7, 128) 0 _________________________________________________________________ global_max_pooling2d (Global (None, 128) 0 _________________________________________________________________ dense_4 (Dense) (None, 1) 129 ================================================================= Total params: 74,625 Trainable params: 74,625 Non-trainable params: 0 _________________________________________________________________
Em seguida, vamos criar uma rede gerador, que transforma vetores latentes em saídas de forma (28, 28, 1)
(representando dígitos MNIST):
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
Aqui está a parte principal: o loop de treinamento. Como você pode ver, é bastante simples. A função de etapa de treinamento leva apenas 17 linhas.
# Instantiate one optimizer for the discriminator and another for the generator.
d_optimizer = keras.optimizers.Adam(learning_rate=0.0003)
g_optimizer = keras.optimizers.Adam(learning_rate=0.0004)
# Instantiate a loss function.
loss_fn = keras.losses.BinaryCrossentropy(from_logits=True)
@tf.function
def train_step(real_images):
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
# Decode them to fake images
generated_images = generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((real_images.shape[0], 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(labels.shape)
# Train the discriminator
with tf.GradientTape() as tape:
predictions = discriminator(combined_images)
d_loss = loss_fn(labels, predictions)
grads = tape.gradient(d_loss, discriminator.trainable_weights)
d_optimizer.apply_gradients(zip(grads, discriminator.trainable_weights))
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = discriminator(generator(random_latent_vectors))
g_loss = loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, generator.trainable_weights)
g_optimizer.apply_gradients(zip(grads, generator.trainable_weights))
return d_loss, g_loss, generated_images
Vamos treinar nosso GAN, chamando repetidamente train_step
em lotes de imagens.
Já que nosso discriminador e gerador são convnets, você vai querer rodar este código em uma GPU.
import os
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
epochs = 1 # In practice you need at least 20 epochs to generate nice digits.
save_dir = "./"
for epoch in range(epochs):
print("\nStart epoch", epoch)
for step, real_images in enumerate(dataset):
# Train the discriminator & generator on one batch of real images.
d_loss, g_loss, generated_images = train_step(real_images)
# Logging.
if step % 200 == 0:
# Print metrics
print("discriminator loss at step %d: %.2f" % (step, d_loss))
print("adversarial loss at step %d: %.2f" % (step, g_loss))
# Save one generated image
img = tf.keras.preprocessing.image.array_to_img(
generated_images[0] * 255.0, scale=False
)
img.save(os.path.join(save_dir, "generated_img" + str(step) + ".png"))
# To limit execution time we stop after 10 steps.
# Remove the lines below to actually train the model!
if step > 10:
break
Start epoch 0 discriminator loss at step 0: 0.69 adversarial loss at step 0: 0.69
É isso! Você obterá dígitos MNIST falsos de boa aparência depois de apenas ~ 30s de treinamento na GPU Colab.