Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar libreta |
Este tutorial demuestra cómo usar tf.distribute.Strategy
con bucles de entrenamiento personalizados. Entrenaremos un modelo CNN simple en el conjunto de datos MNIST de moda. El conjunto de datos de moda MNIST contiene 60000 imágenes de trenes de tamaño 28 x 28 y 10000 imágenes de prueba de tamaño 28 x 28.
Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos brindan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento.
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
Descargue el conjunto de datos de moda MNIST
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Crea una estrategia para distribuir las variables y el gráfico.
¿Cómo funciona la estrategia tf.distribute.MirroredStrategy
?
- Todas las variables y el gráfico del modelo se replican en las réplicas.
- La entrada se distribuye uniformemente entre las réplicas.
- Cada réplica calcula la pérdida y los gradientes de la entrada que recibió.
- Los gradientes se sincronizan en todas las réplicas al sumarlos.
- Después de la sincronización, se realiza la misma actualización en las copias de las variables en cada réplica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1
Configurar tubería de entrada
Exporte el gráfico y las variables al formato de modelo guardado independiente de la plataforma. Después de guardar su modelo, puede cargarlo con o sin el alcance.
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
Cree los conjuntos de datos y distribúyalos:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
2022-01-26 05:45:53.991501: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_UINT8 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } dim { size: 1 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } } 2022-01-26 05:45:54.034762: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_UINT8 } } } attr { key: "_cardinality" value { i: 10000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:3" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } dim { size: 1 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } }
Crear el modelo
Cree un modelo usando tf.keras.Sequential
. También puede usar la API de subclases de modelos para hacer esto.
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
Definir la función de pérdida
Normalmente, en una sola máquina con 1 GPU/CPU, la pérdida se divide por la cantidad de ejemplos en el lote de entrada.
Entonces, ¿cómo se debe calcular la pérdida cuando se usa una tf.distribute.Strategy
?
Por ejemplo, supongamos que tiene 4 GPU y un tamaño de lote de 64. Un lote de entrada se distribuye entre las réplicas (4 GPU), cada réplica recibe una entrada de tamaño 16.
El modelo en cada réplica hace un pase hacia adelante con su entrada respectiva y calcula la pérdida. Ahora, en lugar de dividir la pérdida por el número de ejemplos en su entrada respectiva (BATCH_SIZE_PER_REPLICA = 16), la pérdida debe dividirse por GLOBAL_BATCH_SIZE (64).
¿Por qué hacer esto?
- Esto debe hacerse porque después de calcular los gradientes en cada réplica, se sincronizan entre las réplicas al sumarlas .
¿Cómo hacer esto en TensorFlow?
Si está escribiendo un ciclo de entrenamiento personalizado, como en este tutorial, debe sumar las pérdidas por ejemplo y dividir la suma por GLOBAL_BATCH_SIZE:
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
o puede usartf.nn.compute_average_loss
que toma la pérdida por ejemplo, pesos de muestra opcionales y GLOBAL_BATCH_SIZE como argumentos y devuelve la pérdida escalada.Si está utilizando pérdidas de regularización en su modelo, entonces necesita escalar el valor de pérdida por número de réplicas. Puede hacer esto usando la función
tf.nn.scale_regularization_loss
.No se recomienda usar
tf.reduce_mean
. Al hacerlo, se divide la pérdida por el tamaño real del lote por réplica, que puede variar de un paso a otro.Esta reducción y escalado se realiza automáticamente en keras
model.compile
ymodel.fit
Si usa clases
tf.keras.losses
(como en el ejemplo a continuación), la reducción de pérdida debe especificarse explícitamente comoNONE
oSUM
.AUTO
ySUM_OVER_BATCH_SIZE
no están permitidos cuando se usan contf.distribute.Strategy
.AUTO
no está permitido porque el usuario debe pensar explícitamente qué reducción quiere para asegurarse de que sea correcta en el caso distribuido.SUM_OVER_BATCH_SIZE
no está permitido porque actualmente solo dividiría por el tamaño del lote de réplicas y dejaría la división por el número de réplicas al usuario, lo que podría ser fácil pasar por alto. Entonces, en lugar de eso, le pedimos al usuario que haga la reducción por sí mismo explícitamente.Si
labels
son multidimensionales,per_example_loss
a través del número de elementos en cada muestra. Por ejemplo, si la forma de laspredictions
es(batch_size, H, W, n_classes)
ylabels
son(batch_size, H, W)
, deberá actualizarper_example_loss
como:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
with strategy.scope():
# Set reduction to `none` so we can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
Defina las métricas para rastrear la pérdida y la precisión
Estas métricas rastrean la pérdida de prueba y el entrenamiento y la precisión de la prueba. Puede usar .result()
para obtener las estadísticas acumuladas en cualquier momento.
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Bucle de entrenamiento
# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# TRAIN LOOP
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# TEST LOOP
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
Epoch 1, Loss: 0.5106383562088013, Accuracy: 81.77999877929688, Test Loss: 0.39399346709251404, Test Accuracy: 85.79000091552734 Epoch 2, Loss: 0.3362727463245392, Accuracy: 87.91333770751953, Test Loss: 0.35871225595474243, Test Accuracy: 86.7699966430664 Epoch 3, Loss: 0.2928692400455475, Accuracy: 89.2683334350586, Test Loss: 0.2999486029148102, Test Accuracy: 89.04000091552734 Epoch 4, Loss: 0.2605818510055542, Accuracy: 90.41999816894531, Test Loss: 0.28474125266075134, Test Accuracy: 89.47000122070312 Epoch 5, Loss: 0.23641237616539001, Accuracy: 91.32166290283203, Test Loss: 0.26421546936035156, Test Accuracy: 90.41000366210938 Epoch 6, Loss: 0.2192477434873581, Accuracy: 91.90499877929688, Test Loss: 0.2650589942932129, Test Accuracy: 90.4800033569336 Epoch 7, Loss: 0.20016911625862122, Accuracy: 92.66999816894531, Test Loss: 0.25025954842567444, Test Accuracy: 90.9000015258789 Epoch 8, Loss: 0.18381091952323914, Accuracy: 93.26499938964844, Test Loss: 0.2585820257663727, Test Accuracy: 90.95999908447266 Epoch 9, Loss: 0.1699329912662506, Accuracy: 93.67500305175781, Test Loss: 0.26234227418899536, Test Accuracy: 91.0199966430664 Epoch 10, Loss: 0.15756534039974213, Accuracy: 94.16333770751953, Test Loss: 0.25516414642333984, Test Accuracy: 90.93000030517578
Cosas a tener en cuenta en el ejemplo anterior:
- Estamos iterando sobre
train_dist_dataset
ytest_dist_dataset
usando una construcciónfor x in ...
. - La pérdida escalada es el valor de retorno de
distributed_train_step
. Este valor se agrega entre réplicas mediante la llamadatf.distribute.Strategy.reduce
y luego entre lotes sumando el valor de retorno de las llamadastf.distribute.Strategy.reduce
. -
tf.keras.Metrics
debe actualizarse dentrotrain_step
ytest_step
que ejecutatf.distribute.Strategy.run
. *tf.distribute.Strategy.run
devuelve resultados de cada réplica local en la estrategia, y hay varias formas de consumir este resultado. Puede hacertf.distribute.Strategy.reduce
para obtener un valor agregado. También puede hacertf.distribute.Strategy.experimental_local_results
para obtener la lista de valores contenidos en el resultado, uno por réplica local.
Restaurar el último punto de control y prueba
Un modelo marcado con un tf.distribute.Strategy
se puede restaurar con o sin una estrategia.
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0199966430664
Formas alternativas de iterar sobre un conjunto de datos
Usar iteradores
Si desea iterar sobre una cantidad determinada de pasos y no a través de todo el conjunto de datos, puede crear un iterador usando la llamada iter
y llamar explícitamente a next
en el iterador. Puede optar por iterar sobre el conjunto de datos tanto dentro como fuera de la función tf. Aquí hay un pequeño fragmento que demuestra la iteración del conjunto de datos fuera de la función tf. usando un iterador.
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
Epoch 10, Loss: 0.17486707866191864, Accuracy: 93.4375 Epoch 10, Loss: 0.12386945635080338, Accuracy: 95.3125 Epoch 10, Loss: 0.16411852836608887, Accuracy: 93.90625 Epoch 10, Loss: 0.10728752613067627, Accuracy: 96.40625 Epoch 10, Loss: 0.11865834891796112, Accuracy: 95.625 Epoch 10, Loss: 0.12875251471996307, Accuracy: 95.15625 Epoch 10, Loss: 0.1189488023519516, Accuracy: 95.625 Epoch 10, Loss: 0.1456708014011383, Accuracy: 95.15625 Epoch 10, Loss: 0.12446556240320206, Accuracy: 95.3125 Epoch 10, Loss: 0.1380888819694519, Accuracy: 95.46875
Iterando dentro de una función tf.
También puede iterar sobre toda la entrada train_dist_dataset
dentro de una función tf usando la construcción for x in ...
o creando iteradores como lo hicimos anteriormente. El siguiente ejemplo muestra envolver una época de entrenamiento en una tf.function e iterar sobre train_dist_dataset
dentro de la función.
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " Epoch 1, Loss: 0.14398494362831116, Accuracy: 94.63999938964844 Epoch 2, Loss: 0.13246288895606995, Accuracy: 94.97333526611328 Epoch 3, Loss: 0.11922841519117355, Accuracy: 95.63833618164062 Epoch 4, Loss: 0.11084160208702087, Accuracy: 95.99333190917969 Epoch 5, Loss: 0.10420522093772888, Accuracy: 96.0816650390625 Epoch 6, Loss: 0.09215126931667328, Accuracy: 96.63500213623047 Epoch 7, Loss: 0.0878651961684227, Accuracy: 96.67666625976562 Epoch 8, Loss: 0.07854588329792023, Accuracy: 97.09333038330078 Epoch 9, Loss: 0.07217177003622055, Accuracy: 97.34833526611328 Epoch 10, Loss: 0.06753655523061752, Accuracy: 97.48999786376953
Seguimiento de la pérdida de entrenamiento en réplicas
No recomendamos usar tf.metrics.Mean
para realizar un seguimiento de la pérdida de entrenamiento en diferentes réplicas, debido al cálculo de escala de pérdida que se lleva a cabo.
Por ejemplo, si ejecuta un trabajo de entrenamiento con las siguientes características:
- Dos réplicas
- Se procesan dos muestras en cada réplica.
- Valores de pérdida resultantes: [2, 3] y [4, 5] en cada réplica
- Tamaño de lote global = 4
Con el escalado de pérdida, calcula el valor de pérdida por muestra en cada réplica sumando los valores de pérdida y luego dividiendo por el tamaño de lote global. En este caso: (2 + 3) / 4 = 1.25
y (4 + 5) / 4 = 2.25
.
Si usa tf.metrics.Mean
para rastrear la pérdida en las dos réplicas, el resultado es diferente. En este ejemplo, termina con un total
de 3,50 y un count
de 2, lo que da como resultado total
/ count
= 1,75 cuando se llama a result()
en la métrica. La pérdida calculada con tf.keras.Metrics
se escala por un factor adicional que es igual al número de réplicas sincronizadas.
Guía y ejemplos
Estos son algunos ejemplos del uso de la estrategia de distribución con bucles de entrenamiento personalizados:
- Guía de formación distribuida
- Ejemplo de DenseNet usando
MirroredStrategy
. - Ejemplo de BERT entrenado con
MirroredStrategy
yTPUStrategy
. Este ejemplo es particularmente útil para comprender cómo cargar desde un punto de control y generar puntos de control periódicos durante el entrenamiento distribuido, etc. - Ejemplo de NCF entrenado con
MirroredStrategy
que se puede habilitar con el indicadorkeras_use_ctl
. - Ejemplo de NMT entrenado con
MirroredStrategy
.
Más ejemplos enumerados en la guía de estrategia de distribución .
Próximos pasos
- Pruebe la nueva API
tf.distribute.Strategy
en sus modelos. - Visite la sección Rendimiento de la guía para obtener más información sobre otras estrategias y herramientas que puede usar para optimizar el rendimiento de sus modelos de TensorFlow.