Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Panoramica
Questo tutorial mostra come eseguire la formazione distribuita per più lavoratori con un modello Keras e l'API Model.fit
utilizzando l'API tf.distribute.Strategy
, in particolare la classe tf.distribute.MultiWorkerMirroredStrategy
. Con l'aiuto di questa strategia, un modello Keras progettato per essere eseguito su un singolo lavoratore può funzionare senza problemi su più lavoratori con modifiche minime al codice.
Per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy
, è disponibile la guida Distributed training in TensorFlow per una panoramica delle strategie di distribuzione supportate da TensorFlow.
Per informazioni su come utilizzare MultiWorkerMirroredStrategy
con Keras e un ciclo di formazione personalizzato, fare riferimento a Ciclo di formazione personalizzato con Keras e MultiWorkerMirroredStrategy .
Si noti che lo scopo di questa esercitazione è dimostrare un esempio minimo di più lavoratori con due lavoratori.
Impostare
Inizia con alcune importazioni necessarie:
import json
import os
import sys
Prima di importare TensorFlow, apportare alcune modifiche all'ambiente:
- Disabilita tutte le GPU. Ciò previene gli errori causati dai lavoratori che tentano tutti di utilizzare la stessa GPU. In un'applicazione del mondo reale, ogni lavoratore si troverebbe su una macchina diversa.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- Reimposta la variabile di ambiente
TF_CONFIG
(ne saprai di più in seguito):
os.environ.pop('TF_CONFIG', None)
- Assicurati che la directory corrente si trovi nel percorso di Python: ciò consente al notebook di importare i file scritti da
%%writefile
secondo momento:
if '.' not in sys.path:
sys.path.insert(0, '.')
Ora importa TensorFlow:
import tensorflow as tf
Dataset e definizione del modello
Quindi, crea un file mnist_setup.py
con un modello semplice e una configurazione del set di dati. Questo file Python verrà utilizzato dai processi di lavoro in questo tutorial:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
Formazione modello su un singolo lavoratore
Prova ad addestrare il modello per un numero limitato di epoche e osserva i risultati di un singolo lavoratore per assicurarti che tutto funzioni correttamente. Con il progredire dell'allenamento, la perdita dovrebbe diminuire e la precisione dovrebbe aumentare.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
Configurazione multi-operatore
Entriamo ora nel mondo della formazione multi-lavoratore.
Un cluster con lavori e attività
In TensorFlow, la formazione distribuita comprende: un 'cluster'
con diversi lavori e ciascuno dei lavori può avere uno o più 'task'
.
Avrai bisogno della variabile d'ambiente di configurazione TF_CONFIG
per l'addestramento su più macchine, ognuna delle quali possibilmente ha un ruolo diverso. TF_CONFIG
è una stringa JSON utilizzata per specificare la configurazione del cluster per ogni lavoratore che fa parte del cluster.
Ci sono due componenti di una variabile TF_CONFIG
: 'cluster'
e 'task'
.
Un
'cluster'
è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavoro, come'worker'
o'chief'
.- Nella formazione per più lavoratori con
tf.distribute.MultiWorkerMirroredStrategy
, di solito c'è un'worker'
che si assume responsabilità, come salvare un checkpoint e scrivere un file di riepilogo per TensorBoard, oltre a ciò che fa un normale'worker'
. Tale'worker'
è indicato come il capo lavoratore (con un nome di lavoro'chief'
). - È consuetudine che il
'chief'
abbia'index'
0
a cui è assegnato (in effetti, è così che viene implementatotf.distribute.Strategy
).
- Nella formazione per più lavoratori con
Un'attività fornisce informazioni
'task'
corrente ed è diversa per ogni lavoratore. Specifica il'type'
e'index'
di quel lavoratore.
Di seguito è riportato un esempio di configurazione:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Ecco lo stesso TF_CONFIG
serializzato come stringa JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Nota che tf_config
è solo una variabile locale in Python. Per poterlo utilizzare per una configurazione di addestramento, questo dict deve essere serializzato come JSON e inserito in una variabile di ambiente TF_CONFIG
.
Nella configurazione di esempio sopra, hai impostato l'attività 'type'
su 'worker'
e l'attività 'index'
su 0
. Pertanto, questa macchina è il primo lavoratore. Sarà nominato 'chief'
e farà più lavoro degli altri.
A scopo illustrativo, questo tutorial mostra come impostare una variabile TF_CONFIG
con due worker su un localhost
.
In pratica, creeresti più lavoratori su indirizzi/porte IP esterni e imposteresti una variabile TF_CONFIG
su ciascun lavoratore di conseguenza.
In questo tutorial utilizzerai due lavoratori:
- Il primo (
'chief'
)TF_CONFIG
del lavoratore è mostrato sopra. - Per il secondo lavoratore, imposterai
tf_config['task']['index']=1
Variabili d'ambiente e sottoprocessi nei notebook
I sottoprocessi ereditano le variabili di ambiente dal loro genitore.
Ad esempio, puoi impostare una variabile di ambiente in questo processo Jupyter Notebook come segue:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Quindi, puoi accedere alla variabile di ambiente da un sottoprocesso:
echo ${GREETINGS}
Hello TensorFlow!
Nella sezione successiva, utilizzerai un metodo simile per passare TF_CONFIG
ai sottoprocessi di lavoro. In uno scenario reale, non avvieresti i tuoi lavori in questo modo, ma in questo esempio è sufficiente.
Scegli la strategia giusta
In TensorFlow, ci sono due forme principali di formazione distribuita:
- Formazione sincrona , in cui le fasi della formazione sono sincronizzate tra i lavoratori e le repliche e
- Training asincrono , in cui i passaggi di training non sono strettamente sincronizzati (ad esempio, training del server dei parametri ).
Questa esercitazione mostra come eseguire la formazione sincrona per più lavoratori usando un'istanza di tf.distribute.MultiWorkerMirroredStrategy
.
MultiWorkerMirroredStrategy
crea copie di tutte le variabili nei livelli del modello su ciascun dispositivo in tutti i lavoratori. Utilizza CollectiveOps
, un'operazione TensorFlow per la comunicazione collettiva, per aggregare i gradienti e mantenere sincronizzate le variabili. La guida tf.distribute.Strategy
contiene maggiori dettagli su questa strategia.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
fornisce implementazioni multiple tramite il parametro tf.distribute.experimental.CommunicationOptions
: 1) RING
implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione cross-host; 2) NCCL
utilizza la NVIDIA Collective Communication Library per implementare i collettivi; e 3) AUTO
rinvia la scelta al runtime. La migliore scelta di implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster.
Allena il modello
Con l'integrazione dell'API tf.distribute.Strategy
in tf.keras
, l'unica modifica che apporterai per distribuire la formazione a più lavoratori è racchiudere la creazione del modello e la chiamata model.compile()
all'interno strategy.scope()
. L'ambito della strategia di distribuzione determina come e dove vengono create le variabili e, nel caso di MultiWorkerMirroredStrategy
, le variabili create sono MirroredVariable
e vengono replicate su ciascuno dei lavoratori.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
Per eseguire effettivamente MultiWorkerMirroredStrategy
dovrai eseguire i processi di lavoro e passare loro un TF_CONFIG
.
Come il file mnist_setup.py
scritto in precedenza, ecco il main.py
che ciascuno dei lavoratori eseguirà:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
Nello snippet di codice sopra, nota che global_batch_size
, che viene passato a Dataset.batch
, è impostato su per_worker_batch_size * num_workers
. Ciò garantisce che ogni lavoratore elabori batch di esempi per_worker_batch_size
indipendentemente dal numero di lavoratori.
La directory corrente ora contiene entrambi i file Python:
ls *.py
main.py mnist_setup.py
Quindi json-serializza TF_CONFIG
e aggiungilo alle variabili di ambiente:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Ora puoi avviare un processo di lavoro che eseguirà main.py
e utilizzerà TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Ci sono alcune cose da notare sul comando precedente:
- Usa
%%bash
che è una "magia" del notebook per eseguire alcuni comandi bash. - Utilizza il flag
--bg
per eseguire il processobash
in background, perché questo lavoratore non verrà terminato. Aspetta tutti i lavoratori prima di iniziare.
Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &>
reindirizza il suo output a un file in modo che tu possa controllare cosa è successo in un file di registro in un secondo momento.
Quindi, attendi qualche secondo affinché il processo si avvii:
import time
time.sleep(10)
Ora, controlla cosa è stato prodotto finora nel file di registro del lavoratore:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
L'ultima riga del file di registro dovrebbe dire: Started server with target: grpc://localhost:12345
. Il primo lavoratore è ora pronto e attende che tutti gli altri lavoratori siano pronti per procedere.
Quindi aggiorna tf_config
affinché il processo del secondo lavoratore raccolga:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Lancia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire il background di questo processo):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Se ricontrolli i log scritti dal primo lavoratore, scoprirai che ha partecipato alla formazione di quel modello:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
Non sorprende che sia stato eseguito più lentamente del test eseguito all'inizio di questo tutorial.
L'esecuzione di più lavoratori su una singola macchina aggiunge solo un sovraccarico.
L'obiettivo qui non era quello di migliorare i tempi di formazione, ma solo di fare un esempio di formazione multi-lavoratore.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Formazione multi-lavoratore in profondità
Finora, hai imparato come eseguire una configurazione multi-lavoratore di base.
Durante il resto del tutorial, imparerai in dettaglio altri fattori, che possono essere utili o importanti per casi d'uso reali.
Partizionamento orizzontale del set di dati
Nella formazione multi-lavoratore, il partizionamento orizzontale del set di dati è necessario per garantire convergenza e prestazioni.
L'esempio nella sezione precedente si basa sull'autosharding predefinito fornito dall'API tf.distribute.Strategy
. È possibile controllare lo sharding impostando tf.data.experimental.AutoShardPolicy
di tf.data.experimental.DistributeOptions
.
Per ulteriori informazioni sull'auto-sharding , fare riferimento alla Guida all'input distribuito .
Ecco un rapido esempio di come disattivare il partizionamento orizzontale automatico, in modo che ogni replica elabori ogni esempio ( non consigliato ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
Valutazione
Se passi i validation_data
in Model.fit
, si alternerà tra training e valutazione per ogni epoca. La valutazione che prende i dati di validation_data
è distribuita sullo stesso insieme di lavoratori ei risultati della valutazione sono aggregati e disponibili per tutti i lavoratori.
Analogamente all'addestramento, il set di dati di convalida viene automaticamente partizionato a livello di file. È necessario impostare una dimensione batch globale nel set di dati di convalida e impostare validation_steps
.
Per la valutazione si raccomanda anche un set di dati ripetuto.
In alternativa, puoi anche creare un'altra attività che legge periodicamente i checkpoint ed esegue la valutazione. Questo è ciò che fa Estimator. Ma questo non è un modo consigliato per eseguire la valutazione e quindi i suoi dettagli vengono omessi.
Prestazione
Ora hai un modello Keras che è tutto configurato per l'esecuzione in più worker con MultiWorkerMirroredStrategy
.
Per modificare le prestazioni della formazione multi-lavoratore, puoi provare quanto segue:
tf.distribute.MultiWorkerMirroredStrategy
fornisce più implementazioni di comunicazione collettiva :-
RING
implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione tra host. -
NCCL
utilizza la NVIDIA Collective Communication Library per implementare i collettivi. -
AUTO
rinvia la scelta al runtime.
La migliore scelta di implementazione collettiva dipende dal numero di GPU, dal tipo di GPU e dall'interconnessione di rete nel cluster. Per ignorare la scelta automatica, specificare il parametro
communication_options
del costruttore diMultiWorkerMirroredStrategy
. Per esempio:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
Trasmetti le variabili su
tf.float
se possibile:- Il modello ufficiale di ResNet include un esempio di come questo può essere fatto.
Tolleranza ai guasti
Nell'addestramento sincrono, il cluster fallirebbe se uno dei lavoratori si guasta e non esiste alcun meccanismo di ripristino degli errori.
L'utilizzo di Keras con tf.distribute.Strategy
offre il vantaggio della tolleranza agli errori nei casi in cui i lavoratori muoiono o siano altrimenti instabili. Puoi farlo preservando lo stato di addestramento nel file system distribuito di tua scelta, in modo tale che al riavvio dell'istanza che in precedenza non è riuscita o è stata annullata, lo stato di addestramento venga ripristinato.
Quando un lavoratore diventa non disponibile, gli altri lavoratori falliranno (possibilmente dopo un timeout). In questi casi, il lavoratore non disponibile deve essere riavviato, così come gli altri lavoratori che hanno fallito.
Richiamata ModelCheckpoint
La richiamata ModelCheckpoint
non fornisce più la funzionalità di tolleranza agli errori, utilizzare invece la richiamata BackupAndRestore
.
La richiamata ModelCheckpoint
può ancora essere utilizzata per salvare i checkpoint. Ma con questo, se l'allenamento è stato interrotto o terminato con successo, per continuare l'allenamento dal checkpoint, l'utente è responsabile di caricare manualmente il modello.
Facoltativamente, l'utente può scegliere di salvare e ripristinare il modello/pesi al di fuori della richiamata ModelCheckpoint
.
Salvataggio e caricamento del modello
Per salvare il modello utilizzando model.save
o tf.saved_model.save
, la destinazione di salvataggio deve essere diversa per ogni lavoratore.
- Per i lavoratori non capi, dovrai salvare il modello in una directory temporanea.
- Per il capo, dovrai salvare nella directory del modello fornita.
Le directory temporanee sul lavoratore devono essere univoche per evitare errori derivanti da più lavoratori che tentano di scrivere nella stessa posizione.
Il modello salvato in tutte le directory è identico e in genere solo il modello salvato dal capo deve essere referenziato per il ripristino o il servizio.
Dovresti avere una logica di pulizia che elimini le directory temporanee create dai lavoratori una volta completata la tua formazione.
Il motivo per risparmiare sul capo e sui lavoratori allo stesso tempo è perché potresti aggregare variabili durante il checkpoint che richiede che sia il capo che i lavoratori partecipino al protocollo di comunicazione allreduce. D'altra parte, lasciare che capo e lavoratori salvino nella stessa directory del modello comporterà errori dovuti alla contesa.
Utilizzando MultiWorkerMirroredStrategy
, il programma viene eseguito su ogni lavoratore e, per sapere se il lavoratore corrente è il capo, sfrutta l'oggetto risolutore del cluster che ha attributi task_type
e task_id
:
-
task_type
ti dice qual è il lavoro corrente (es'worker'
). -
task_id
ti dice l'identificatore del lavoratore. - Il lavoratore con
task_id == 0
è designato come capo lavoratore.
Nel frammento di codice seguente, la funzione write_filepath
fornisce il percorso del file da scrivere, che dipende dal task_id
del lavoratore:
- Per il capo lavoratore (con
task_id == 0
), scrive nel percorso del file originale. - Per gli altri lavoratori, crea una directory temporanea,
temp_dir
, contask_id
nel percorso della directory in cui scrivere:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
Con questo, ora sei pronto per salvare:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
Come descritto sopra, in seguito il modello dovrebbe essere caricato solo dal percorso salvato dal capo, quindi rimuoviamo quelli temporanei salvati dai non capi:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
Ora, quando è il momento di caricare, utilizziamo la comoda API tf.keras.models.load_model
e continuiamo con ulteriore lavoro.
Qui, supponi di utilizzare solo un singolo lavoratore per caricare e continuare l'addestramento, nel qual caso non chiami tf.keras.models.load_model
all'interno di un altro strategy.scope()
(nota che strategy = tf.distribute.MultiWorkerMirroredStrategy()
, come definito in precedenza ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
Salvataggio e ripristino del checkpoint
D'altra parte, il checkpoint ti consente di salvare i pesi del tuo modello e ripristinarli senza dover salvare l'intero modello.
Qui creerai un tf.train.Checkpoint
che tiene traccia del modello, che è gestito da tf.train.CheckpointManager
, in modo che venga preservato solo l'ultimo checkpoint:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Una volta impostato CheckpointManager
, sei pronto per salvare e rimuovere i checkpoint salvati dai non capi:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
Ora, quando devi ripristinare il modello, puoi trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint
. Dopo aver ripristinato il checkpoint, puoi continuare con l'allenamento.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
Richiamata di backup e ripristino
Il callback tf.keras.callbacks.BackupAndRestore
fornisce la funzionalità di tolleranza agli errori eseguendo il backup del modello e del numero di epoch corrente in un file di checkpoint temporaneo nell'argomento BackupAndRestore
backup_dir
Questo viene fatto alla fine di ogni epoca.
Una volta che i lavori vengono interrotti e riavviati, la richiamata ripristina l'ultimo checkpoint e la formazione continua dall'inizio dell'epoca interrotta. Qualsiasi addestramento parziale già fatto nell'epoca incompiuta prima dell'interruzione verrà buttato via, in modo che non influisca sullo stato finale del modello.
Per usarlo, fornisci un'istanza di tf.keras.callbacks.BackupAndRestore
alla chiamata Model.fit
.
Con MultiWorkerMirroredStrategy
, se un lavoratore viene interrotto, l'intero cluster si interrompe fino al riavvio del lavoratore interrotto. Anche gli altri worker verranno riavviati e il worker interrotto si unirà nuovamente al cluster. Quindi, ogni lavoratore legge il file del checkpoint precedentemente salvato e riprende il suo stato precedente, consentendo così al cluster di tornare sincronizzato. Poi, la formazione continua.
La richiamata BackupAndRestore
utilizza CheckpointManager
per salvare e ripristinare lo stato di addestramento, che genera un file chiamato checkpoint che tiene traccia dei checkpoint esistenti insieme a quello più recente. Per questo motivo, backup_dir
non dovrebbe essere riutilizzato per memorizzare altri checkpoint al fine di evitare collisioni di nomi.
Attualmente, il callback BackupAndRestore
supporta la formazione per singolo lavoratore senza strategia, MirroredStrategy
, e la formazione per più lavoratori con MultiWorkerMirroredStrategy
.
Di seguito sono riportati due esempi sia per la formazione multi-operatore che per la formazione mono-operatore:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
Se controlli la directory di backup_dir
specificata in BackupAndRestore
, potresti notare alcuni file di checkpoint generati temporaneamente. Questi file sono necessari per recuperare le istanze perse in precedenza e verranno rimossi dalla libreria alla fine di Model.fit
al termine della formazione.
Risorse addizionali
- La guida Formazione distribuita in TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
- L' esercitazione Ciclo di formazione personalizzato con Keras e MultiWorkerMirroredStrategy mostra come usare
MultiWorkerMirroredStrategy
con Keras e un ciclo di formazione personalizzato. - Scopri i modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
- La guida Prestazioni migliori con tf.function fornisce informazioni su altre strategie e strumenti, come TensorFlow Profiler che puoi utilizzare per ottimizzare le prestazioni dei tuoi modelli TensorFlow.