Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Questo tutorial mostra come usare tf.distribute.Strategy
con cicli di formazione personalizzati. Addestreremo un semplice modello CNN sul set di dati fashion MNIST. Il set di dati fashion MNIST contiene 60000 immagini di treni di dimensioni 28 x 28 e 10000 immagini di test di dimensioni 28 x 28.
Utilizziamo cicli di allenamento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'allenamento. Inoltre, è più semplice eseguire il debug del modello e del ciclo di addestramento.
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
Scarica il set di dati MNIST della moda
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Adding a dimension to the array -> new shape == (28, 28, 1)
# We are doing this because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Getting the images in [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
Creare una strategia per distribuire le variabili e il grafico
Come funziona la strategia tf.distribute.MirroredStrategy
?
- Tutte le variabili e il grafico del modello vengono replicati sulle repliche.
- L'input è distribuito uniformemente tra le repliche.
- Ogni replica calcola la perdita e i gradienti per l'input ricevuto.
- I gradienti vengono sincronizzati su tutte le repliche sommandole.
- Dopo la sincronizzazione, lo stesso aggiornamento viene effettuato alle copie delle variabili su ciascuna replica.
# If the list of devices is not specified in the
# `tf.distribute.MirroredStrategy` constructor, it will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
print ('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 1
Imposta la pipeline di input
Esporta il grafico e le variabili nel formato SavedModel indipendente dalla piattaforma. Dopo che il tuo modello è stato salvato, puoi caricarlo con o senza l'oscilloscopio.
BUFFER_SIZE = len(train_images)
BATCH_SIZE_PER_REPLICA = 64
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
EPOCHS = 10
Crea i set di dati e distribuiscili:
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
2022-01-26 05:45:53.991501: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_UINT8 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } dim { size: 1 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } } 2022-01-26 05:45:54.034762: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_UINT8 } } } attr { key: "_cardinality" value { i: 10000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:3" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } dim { size: 1 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_UINT8 } } } } }
Crea il modello
Crea un modello usando tf.keras.Sequential
. Puoi anche utilizzare l'API Model Subclassing per farlo.
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
Definire la funzione di perdita
Normalmente, su una singola macchina con 1 GPU/CPU, la perdita è divisa per il numero di esempi nel batch di input.
Quindi, come dovrebbe essere calcolata la perdita quando si utilizza un tf.distribute.Strategy
?
Ad esempio, supponiamo che tu abbia 4 GPU e una dimensione batch di 64. Un batch di input è distribuito tra le repliche (4 GPU), ciascuna replica riceve un input di dimensione 16.
Il modello su ciascuna replica esegue un passaggio in avanti con il rispettivo input e calcola la perdita. Ora, invece di dividere la perdita per il numero di esempi nel rispettivo input (BATCH_SIZE_PER_REPLICA = 16), la perdita dovrebbe essere divisa per GLOBAL_BATCH_SIZE (64).
Perché farlo?
- Questo deve essere fatto perché dopo che i gradienti sono stati calcolati su ciascuna replica, vengono sincronizzati tra le repliche sommandoli .
Come farlo in TensorFlow?
Se stai scrivendo un ciclo di allenamento personalizzato, come in questo tutorial, dovresti sommare le perdite per esempio e dividere la somma per GLOBAL_BATCH_SIZE:
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
oppure puoi usaretf.nn.compute_average_loss
che prende la perdita per esempio, i pesi campione facoltativi e GLOBAL_BATCH_SIZE come argomenti e restituisce la perdita in scala.Se stai utilizzando le perdite di regolarizzazione nel tuo modello, devi ridimensionare il valore della perdita in base al numero di repliche. Puoi farlo usando la funzione
tf.nn.scale_regularization_loss
.L'utilizzo
tf.reduce_mean
non è consigliato. In questo modo si divide la perdita per la dimensione effettiva del batch di replica che può variare passo dopo passo.Questa riduzione e ridimensionamento viene eseguita automaticamente in keras
model.compile
emodel.fit
Se si utilizzano classi
tf.keras.losses
(come nell'esempio seguente), la riduzione della perdita deve essere specificata esplicitamente come una traNONE
oSUM
.AUTO
eSUM_OVER_BATCH_SIZE
non sono consentiti se utilizzati contf.distribute.Strategy
.AUTO
non è consentito perché l'utente dovrebbe pensare esplicitamente a quale riduzione desidera per assicurarsi che sia corretta nel caso distribuito.SUM_OVER_BATCH_SIZE
non è consentito perché attualmente dividerebbe solo per dimensione batch di replica e lascerebbe la divisione per numero di repliche all'utente, cosa che potrebbe essere facile da perdere. Quindi, invece, chiediamo all'utente di eseguire la riduzione in modo esplicito.Se
labels
sono multidimensionali, calcola la media diper_example_loss
sul numero di elementi in ciascun campione. Ad esempio, se la forma dellepredictions
è(batch_size, H, W, n_classes)
elabels
è(batch_size, H, W)
, dovrai aggiornareper_example_loss
come:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
with strategy.scope():
# Set reduction to `none` so we can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
Definisci le metriche per monitorare la perdita e l'accuratezza
Queste metriche tengono traccia della perdita del test, della formazione e dell'accuratezza del test. Puoi utilizzare .result()
per ottenere le statistiche accumulate in qualsiasi momento.
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='test_accuracy')
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',). INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Ciclo di formazione
# model, optimizer, and checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_loss.update_state(t_loss)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
@tf.function
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
@tf.function
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
# TRAIN LOOP
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
# TEST LOOP
for x in test_dist_dataset:
distributed_test_step(x)
if epoch % 2 == 0:
checkpoint.save(checkpoint_prefix)
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print (template.format(epoch+1, train_loss,
train_accuracy.result()*100, test_loss.result(),
test_accuracy.result()*100))
test_loss.reset_states()
train_accuracy.reset_states()
test_accuracy.reset_states()
Epoch 1, Loss: 0.5106383562088013, Accuracy: 81.77999877929688, Test Loss: 0.39399346709251404, Test Accuracy: 85.79000091552734 Epoch 2, Loss: 0.3362727463245392, Accuracy: 87.91333770751953, Test Loss: 0.35871225595474243, Test Accuracy: 86.7699966430664 Epoch 3, Loss: 0.2928692400455475, Accuracy: 89.2683334350586, Test Loss: 0.2999486029148102, Test Accuracy: 89.04000091552734 Epoch 4, Loss: 0.2605818510055542, Accuracy: 90.41999816894531, Test Loss: 0.28474125266075134, Test Accuracy: 89.47000122070312 Epoch 5, Loss: 0.23641237616539001, Accuracy: 91.32166290283203, Test Loss: 0.26421546936035156, Test Accuracy: 90.41000366210938 Epoch 6, Loss: 0.2192477434873581, Accuracy: 91.90499877929688, Test Loss: 0.2650589942932129, Test Accuracy: 90.4800033569336 Epoch 7, Loss: 0.20016911625862122, Accuracy: 92.66999816894531, Test Loss: 0.25025954842567444, Test Accuracy: 90.9000015258789 Epoch 8, Loss: 0.18381091952323914, Accuracy: 93.26499938964844, Test Loss: 0.2585820257663727, Test Accuracy: 90.95999908447266 Epoch 9, Loss: 0.1699329912662506, Accuracy: 93.67500305175781, Test Loss: 0.26234227418899536, Test Accuracy: 91.0199966430664 Epoch 10, Loss: 0.15756534039974213, Accuracy: 94.16333770751953, Test Loss: 0.25516414642333984, Test Accuracy: 90.93000030517578
Cose da notare nell'esempio sopra:
- Stiamo iterando su
train_dist_dataset
etest_dist_dataset
usando un costruttofor x in ...
- La perdita scalata è il valore di ritorno di
distributed_train_step
. Questo valore viene aggregato tra le repliche utilizzando la chiamatatf.distribute.Strategy.reduce
e quindi tra i batch sommando il valore restituito dalle chiamatetf.distribute.Strategy.reduce
. -
tf.keras.Metrics
dovrebbe essere aggiornato all'internotrain_step
etest_step
che viene eseguito datf.distribute.Strategy.run
. *tf.distribute.Strategy.run
restituisce i risultati di ogni replica locale nella strategia e sono disponibili diversi modi per utilizzare questo risultato. Puoi eseguiretf.distribute.Strategy.reduce
per ottenere un valore aggregato. Puoi anche eseguiretf.distribute.Strategy.experimental_local_results
per ottenere l'elenco dei valori contenuti nel risultato, uno per replica locale.
Ripristina l'ultimo checkpoint e prova
Un modello sottoposto a checkpoint con tf.distribute.Strategy
può essere ripristinato con o senza una strategia.
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='eval_accuracy')
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
@tf.function
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
for images, labels in test_dataset:
eval_step(images, labels)
print ('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result()*100))
Accuracy after restoring the saved model without strategy: 91.0199966430664
Modi alternativi di iterazione su un set di dati
Utilizzo di iteratori
Se si desidera eseguire l'iterazione su un determinato numero di passaggi e non sull'intero set di dati, è possibile creare un iteratore utilizzando la chiamata iter
e la chiamata esplicita next
sull'iteratore. Puoi scegliere di scorrere il set di dati sia all'interno che all'esterno della funzione tf. Ecco un piccolo frammento che mostra l'iterazione del set di dati al di fuori di tf.function usando un iteratore.
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, average_train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
Epoch 10, Loss: 0.17486707866191864, Accuracy: 93.4375 Epoch 10, Loss: 0.12386945635080338, Accuracy: 95.3125 Epoch 10, Loss: 0.16411852836608887, Accuracy: 93.90625 Epoch 10, Loss: 0.10728752613067627, Accuracy: 96.40625 Epoch 10, Loss: 0.11865834891796112, Accuracy: 95.625 Epoch 10, Loss: 0.12875251471996307, Accuracy: 95.15625 Epoch 10, Loss: 0.1189488023519516, Accuracy: 95.625 Epoch 10, Loss: 0.1456708014011383, Accuracy: 95.15625 Epoch 10, Loss: 0.12446556240320206, Accuracy: 95.3125 Epoch 10, Loss: 0.1380888819694519, Accuracy: 95.46875
Iterazione all'interno di una funzione tf
Puoi anche scorrere l'intero input train_dist_dataset
all'interno di una funzione tf. usando il costrutto for x in ...
o creando iteratori come abbiamo fatto sopra. L'esempio seguente mostra il wrapping di un'epoca di training in un tf.function e l'iterazione su train_dist_dataset
all'interno della funzione.
@tf.function
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print (template.format(epoch+1, train_loss, train_accuracy.result()*100))
train_accuracy.reset_states()
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " Epoch 1, Loss: 0.14398494362831116, Accuracy: 94.63999938964844 Epoch 2, Loss: 0.13246288895606995, Accuracy: 94.97333526611328 Epoch 3, Loss: 0.11922841519117355, Accuracy: 95.63833618164062 Epoch 4, Loss: 0.11084160208702087, Accuracy: 95.99333190917969 Epoch 5, Loss: 0.10420522093772888, Accuracy: 96.0816650390625 Epoch 6, Loss: 0.09215126931667328, Accuracy: 96.63500213623047 Epoch 7, Loss: 0.0878651961684227, Accuracy: 96.67666625976562 Epoch 8, Loss: 0.07854588329792023, Accuracy: 97.09333038330078 Epoch 9, Loss: 0.07217177003622055, Accuracy: 97.34833526611328 Epoch 10, Loss: 0.06753655523061752, Accuracy: 97.48999786376953
Monitoraggio della perdita di allenamento tra le repliche
Non è consigliabile utilizzare tf.metrics.Mean
per tenere traccia della perdita di addestramento su diverse repliche, a causa del calcolo del ridimensionamento della perdita eseguito.
Ad esempio, se si esegue un processo di formazione con le seguenti caratteristiche:
- Due repliche
- Due campioni vengono elaborati su ciascuna replica
- Valori di perdita risultanti: [2, 3] e [4, 5] su ciascuna replica
- Dimensione globale del lotto = 4
Con il ridimensionamento delle perdite, si calcola il valore della perdita per campione su ciascuna replica aggiungendo i valori di perdita e quindi dividendo per la dimensione del batch globale. In questo caso: (2 + 3) / 4 = 1.25
e (4 + 5) / 4 = 2.25
.
Se utilizzi tf.metrics.Mean
per tenere traccia della perdita tra le due repliche, il risultato è diverso. In questo esempio, si ottiene un total
di 3,50 e un count
di 2, che risulta in total
/ count
= 1,75 quando result()
viene chiamato sulla metrica. La perdita calcolata con tf.keras.Metrics
viene ridimensionata di un fattore aggiuntivo uguale al numero di repliche sincronizzate.
Guida ed esempi
Di seguito sono riportati alcuni esempi di utilizzo della strategia di distribuzione con cicli di formazione personalizzati:
- Guida alla formazione distribuita
- Esempio DenseNet usando
MirroredStrategy
. - Esempio BERT addestrato utilizzando
MirroredStrategy
eTPUStrategy
. Questo esempio è particolarmente utile per capire come caricare da un checkpoint e generare checkpoint periodici durante l'addestramento distribuito, ecc. - Esempio NCF addestrato utilizzando
MirroredStrategy
che può essere abilitato utilizzando il flagkeras_use_ctl
. - Esempio NMT addestrato utilizzando
MirroredStrategy
.
Altri esempi elencati nella guida alla strategia di distribuzione .
Prossimi passi
- Prova la nuova API
tf.distribute.Strategy
sui tuoi modelli. - Visita la sezione Prestazioni nella guida per saperne di più su altre strategie e strumenti che puoi utilizzare per ottimizzare le prestazioni dei tuoi modelli TensorFlow.