Formazione multi-lavoratore con Keras

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

Panoramica

Questo tutorial mostra come eseguire la formazione distribuita per più lavoratori con un modello Keras e l'API Model.fit utilizzando l'API tf.distribute.Strategy , in particolare la classe tf.distribute.MultiWorkerMirroredStrategy . Con l'aiuto di questa strategia, un modello Keras progettato per essere eseguito su un singolo lavoratore può funzionare senza problemi su più lavoratori con modifiche minime al codice.

Per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy , è disponibile la guida Distributed training in TensorFlow per una panoramica delle strategie di distribuzione supportate da TensorFlow.

Per informazioni su come utilizzare MultiWorkerMirroredStrategy con Keras e un ciclo di formazione personalizzato, fare riferimento a Ciclo di formazione personalizzato con Keras e MultiWorkerMirroredStrategy .

Si noti che lo scopo di questa esercitazione è dimostrare un esempio minimo di più lavoratori con due lavoratori.

Impostare

Inizia con alcune importazioni necessarie:

import json
import os
import sys

Prima di importare TensorFlow, apportare alcune modifiche all'ambiente:

  1. Disabilita tutte le GPU. Ciò previene gli errori causati dai lavoratori che tentano tutti di utilizzare la stessa GPU. In un'applicazione del mondo reale, ogni lavoratore si troverebbe su una macchina diversa.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Reimposta la variabile di ambiente TF_CONFIG (ne saprai di più in seguito):
os.environ.pop('TF_CONFIG', None)
  1. Assicurati che la directory corrente si trovi nel percorso di Python: ciò consente al notebook di importare i file scritti da %%writefile secondo momento:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Ora importa TensorFlow:

import tensorflow as tf

Dataset e definizione del modello

Quindi, crea un file mnist_setup.py con un modello semplice e una configurazione del set di dati. Questo file Python verrà utilizzato dai processi di lavoro in questo tutorial:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Formazione modello su un singolo lavoratore

Prova ad addestrare il modello per un numero limitato di epoche e osserva i risultati di un singolo lavoratore per assicurarti che tutto funzioni correttamente. Con il progredire dell'allenamento, la perdita dovrebbe diminuire e la precisione dovrebbe aumentare.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Configurazione multi-operatore

Entriamo ora nel mondo della formazione multi-lavoratore.

Un cluster con lavori e attività

In TensorFlow, la formazione distribuita comprende: un 'cluster' con diversi lavori e ciascuno dei lavori può avere uno o più 'task' .

Avrai bisogno della variabile d'ambiente di configurazione TF_CONFIG per l'addestramento su più macchine, ognuna delle quali possibilmente ha un ruolo diverso. TF_CONFIG è una stringa JSON utilizzata per specificare la configurazione del cluster per ogni lavoratore che fa parte del cluster.

Ci sono due componenti di una variabile TF_CONFIG : 'cluster' e 'task' .

  • Un 'cluster' è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavoro, come 'worker' o 'chief' .

    • Nella formazione per più lavoratori con tf.distribute.MultiWorkerMirroredStrategy , di solito c'è un 'worker' che si assume responsabilità, come salvare un checkpoint e scrivere un file di riepilogo per TensorBoard, oltre a ciò che fa un normale 'worker' . Tale 'worker' è indicato come il capo lavoratore (con un nome di lavoro 'chief' ).
    • È consuetudine che il 'chief' abbia 'index' 0 a cui è assegnato (in effetti, è così che viene implementato tf.distribute.Strategy ).
  • Un'attività fornisce informazioni 'task' corrente ed è diversa per ogni lavoratore. Specifica il 'type' e 'index' di quel lavoratore.

Di seguito è riportato un esempio di configurazione:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ecco lo stesso TF_CONFIG serializzato come stringa JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Nota che tf_config è solo una variabile locale in Python. Per poterlo utilizzare per una configurazione di addestramento, questo dict deve essere serializzato come JSON e inserito in una variabile di ambiente TF_CONFIG .

Nella configurazione di esempio sopra, hai impostato l'attività 'type' su 'worker' e l'attività 'index' su 0 . Pertanto, questa macchina è il primo lavoratore. Sarà nominato 'chief' e farà più lavoro degli altri.

A scopo illustrativo, questo tutorial mostra come impostare una variabile TF_CONFIG con due worker su un localhost .

In pratica, creeresti più lavoratori su indirizzi/porte IP esterni e imposteresti una variabile TF_CONFIG su ciascun lavoratore di conseguenza.

In questo tutorial utilizzerai due lavoratori:

  • Il primo ( 'chief' ) TF_CONFIG del lavoratore è mostrato sopra.
  • Per il secondo lavoratore, imposterai tf_config['task']['index']=1

Variabili d'ambiente e sottoprocessi nei notebook

I sottoprocessi ereditano le variabili di ambiente dal loro genitore.

Ad esempio, puoi impostare una variabile di ambiente in questo processo Jupyter Notebook come segue:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Quindi, puoi accedere alla variabile di ambiente da un sottoprocesso:

echo ${GREETINGS}
Hello TensorFlow!

Nella sezione successiva, utilizzerai un metodo simile per passare TF_CONFIG ai sottoprocessi di lavoro. In uno scenario reale, non avvieresti i tuoi lavori in questo modo, ma in questo esempio è sufficiente.

Scegli la strategia giusta

In TensorFlow, ci sono due forme principali di formazione distribuita:

  • Formazione sincrona , in cui le fasi della formazione sono sincronizzate tra i lavoratori e le repliche e
  • Training asincrono , in cui i passaggi di training non sono strettamente sincronizzati (ad esempio, training del server dei parametri ).

Questa esercitazione mostra come eseguire la formazione sincrona per più lavoratori usando un'istanza di tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy crea copie di tutte le variabili nei livelli del modello su ciascun dispositivo in tutti i lavoratori. Utilizza CollectiveOps , un'operazione TensorFlow per la comunicazione collettiva, per aggregare i gradienti e mantenere sincronizzate le variabili. La guida tf.distribute.Strategy contiene maggiori dettagli su questa strategia.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy fornisce implementazioni multiple tramite il parametro tf.distribute.experimental.CommunicationOptions : 1) RING implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione cross-host; 2) NCCL utilizza la NVIDIA Collective Communication Library per implementare i collettivi; e 3) AUTO rinvia la scelta al runtime. La migliore scelta di implementazione collettiva dipende dal numero e dal tipo di GPU e dall'interconnessione di rete nel cluster.

Allena il modello

Con l'integrazione dell'API tf.distribute.Strategy in tf.keras , l'unica modifica che apporterai per distribuire la formazione a più lavoratori è racchiudere la creazione del modello e la chiamata model.compile() all'interno strategy.scope() . L'ambito della strategia di distribuzione determina come e dove vengono create le variabili e, nel caso di MultiWorkerMirroredStrategy , le variabili create sono MirroredVariable e vengono replicate su ciascuno dei lavoratori.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Per eseguire effettivamente MultiWorkerMirroredStrategy dovrai eseguire i processi di lavoro e passare loro un TF_CONFIG .

Come il file mnist_setup.py scritto in precedenza, ecco il main.py che ciascuno dei lavoratori eseguirà:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Nello snippet di codice sopra, nota che global_batch_size , che viene passato a Dataset.batch , è impostato su per_worker_batch_size * num_workers . Ciò garantisce che ogni lavoratore elabori batch di esempi per_worker_batch_size indipendentemente dal numero di lavoratori.

La directory corrente ora contiene entrambi i file Python:

ls *.py
main.py
mnist_setup.py

Quindi json-serializza TF_CONFIG e aggiungilo alle variabili di ambiente:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Ora puoi avviare un processo di lavoro che eseguirà main.py e utilizzerà TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ci sono alcune cose da notare sul comando precedente:

  1. Usa %%bash che è una "magia" del notebook per eseguire alcuni comandi bash.
  2. Utilizza il flag --bg per eseguire il processo bash in background, perché questo lavoratore non verrà terminato. Aspetta tutti i lavoratori prima di iniziare.

Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &> reindirizza il suo output a un file in modo che tu possa controllare cosa è successo in un file di registro in un secondo momento.

Quindi, attendi qualche secondo affinché il processo si avvii:

import time
time.sleep(10)

Ora, controlla cosa è stato prodotto finora nel file di registro del lavoratore:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

L'ultima riga del file di registro dovrebbe dire: Started server with target: grpc://localhost:12345 . Il primo lavoratore è ora pronto e attende che tutti gli altri lavoratori siano pronti per procedere.

Quindi aggiorna tf_config affinché il processo del secondo lavoratore raccolga:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Lancia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire il background di questo processo):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Se ricontrolli i log scritti dal primo lavoratore, scoprirai che ha partecipato alla formazione di quel modello:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Non sorprende che sia stato eseguito più lentamente del test eseguito all'inizio di questo tutorial.

L'esecuzione di più lavoratori su una singola macchina aggiunge solo un sovraccarico.

L'obiettivo qui non era quello di migliorare i tempi di formazione, ma solo di fare un esempio di formazione multi-lavoratore.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Formazione multi-lavoratore in profondità

Finora, hai imparato come eseguire una configurazione multi-lavoratore di base.

Durante il resto del tutorial, imparerai in dettaglio altri fattori, che possono essere utili o importanti per casi d'uso reali.

Partizionamento orizzontale del set di dati

Nella formazione multi-lavoratore, il partizionamento orizzontale del set di dati è necessario per garantire convergenza e prestazioni.

L'esempio nella sezione precedente si basa sull'autosharding predefinito fornito dall'API tf.distribute.Strategy . È possibile controllare lo sharding impostando tf.data.experimental.AutoShardPolicy di tf.data.experimental.DistributeOptions .

Per ulteriori informazioni sull'auto-sharding , fare riferimento alla Guida all'input distribuito .

Ecco un rapido esempio di come disattivare il partizionamento orizzontale automatico, in modo che ogni replica elabori ogni esempio ( non consigliato ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Valutazione

Se passi i validation_data in Model.fit , si alternerà tra training e valutazione per ogni epoca. La valutazione che prende i dati di validation_data è distribuita sullo stesso insieme di lavoratori ei risultati della valutazione sono aggregati e disponibili per tutti i lavoratori.

Analogamente all'addestramento, il set di dati di convalida viene automaticamente partizionato a livello di file. È necessario impostare una dimensione batch globale nel set di dati di convalida e impostare validation_steps .

Per la valutazione si raccomanda anche un set di dati ripetuto.

In alternativa, puoi anche creare un'altra attività che legge periodicamente i checkpoint ed esegue la valutazione. Questo è ciò che fa Estimator. Ma questo non è un modo consigliato per eseguire la valutazione e quindi i suoi dettagli vengono omessi.

Prestazione

Ora hai un modello Keras che è tutto configurato per l'esecuzione in più worker con MultiWorkerMirroredStrategy .

Per modificare le prestazioni della formazione multi-lavoratore, puoi provare quanto segue:

  • tf.distribute.MultiWorkerMirroredStrategy fornisce più implementazioni di comunicazione collettiva :

    • RING implementa collettivi basati su anello utilizzando gRPC come livello di comunicazione tra host.
    • NCCL utilizza la NVIDIA Collective Communication Library per implementare i collettivi.
    • AUTO rinvia la scelta al runtime.

    La migliore scelta di implementazione collettiva dipende dal numero di GPU, dal tipo di GPU e dall'interconnessione di rete nel cluster. Per ignorare la scelta automatica, specificare il parametro communication_options del costruttore di MultiWorkerMirroredStrategy . Per esempio:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Trasmetti le variabili su tf.float se possibile:

    • Il modello ufficiale di ResNet include un esempio di come questo può essere fatto.

Tolleranza ai guasti

Nell'addestramento sincrono, il cluster fallirebbe se uno dei lavoratori si guasta e non esiste alcun meccanismo di ripristino degli errori.

L'utilizzo di Keras con tf.distribute.Strategy offre il vantaggio della tolleranza agli errori nei casi in cui i lavoratori muoiono o siano altrimenti instabili. Puoi farlo preservando lo stato di addestramento nel file system distribuito di tua scelta, in modo tale che al riavvio dell'istanza che in precedenza non è riuscita o è stata annullata, lo stato di addestramento venga ripristinato.

Quando un lavoratore diventa non disponibile, gli altri lavoratori falliranno (possibilmente dopo un timeout). In questi casi, il lavoratore non disponibile deve essere riavviato, così come gli altri lavoratori che hanno fallito.

Richiamata ModelCheckpoint

La richiamata ModelCheckpoint non fornisce più la funzionalità di tolleranza agli errori, utilizzare invece la richiamata BackupAndRestore .

La richiamata ModelCheckpoint può ancora essere utilizzata per salvare i checkpoint. Ma con questo, se l'allenamento è stato interrotto o terminato con successo, per continuare l'allenamento dal checkpoint, l'utente è responsabile di caricare manualmente il modello.

Facoltativamente, l'utente può scegliere di salvare e ripristinare il modello/pesi al di fuori della richiamata ModelCheckpoint .

Salvataggio e caricamento del modello

Per salvare il modello utilizzando model.save o tf.saved_model.save , la destinazione di salvataggio deve essere diversa per ogni lavoratore.

  • Per i lavoratori non capi, dovrai salvare il modello in una directory temporanea.
  • Per il capo, dovrai salvare nella directory del modello fornita.

Le directory temporanee sul lavoratore devono essere univoche per evitare errori derivanti da più lavoratori che tentano di scrivere nella stessa posizione.

Il modello salvato in tutte le directory è identico e in genere solo il modello salvato dal capo deve essere referenziato per il ripristino o il servizio.

Dovresti avere una logica di pulizia che elimini le directory temporanee create dai lavoratori una volta completata la tua formazione.

Il motivo per risparmiare sul capo e sui lavoratori allo stesso tempo è perché potresti aggregare variabili durante il checkpoint che richiede che sia il capo che i lavoratori partecipino al protocollo di comunicazione allreduce. D'altra parte, lasciare che capo e lavoratori salvino nella stessa directory del modello comporterà errori dovuti alla contesa.

Utilizzando MultiWorkerMirroredStrategy , il programma viene eseguito su ogni lavoratore e, per sapere se il lavoratore corrente è il capo, sfrutta l'oggetto risolutore del cluster che ha attributi task_type e task_id :

  • task_type ti dice qual è il lavoro corrente (es 'worker' ).
  • task_id ti dice l'identificatore del lavoratore.
  • Il lavoratore con task_id == 0 è designato come capo lavoratore.

Nel frammento di codice seguente, la funzione write_filepath fornisce il percorso del file da scrivere, che dipende dal task_id del lavoratore:

  • Per il capo lavoratore (con task_id == 0 ), scrive nel percorso del file originale.
  • Per gli altri lavoratori, crea una directory temporanea, temp_dir , con task_id nel percorso della directory in cui scrivere:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Con questo, ora sei pronto per salvare:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Come descritto sopra, in seguito il modello dovrebbe essere caricato solo dal percorso salvato dal capo, quindi rimuoviamo quelli temporanei salvati dai non capi:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Ora, quando è il momento di caricare, utilizziamo la comoda API tf.keras.models.load_model e continuiamo con ulteriore lavoro.

Qui, supponi di utilizzare solo un singolo lavoratore per caricare e continuare l'addestramento, nel qual caso non chiami tf.keras.models.load_model all'interno di un altro strategy.scope() (nota che strategy = tf.distribute.MultiWorkerMirroredStrategy() , come definito in precedenza ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Salvataggio e ripristino del checkpoint

D'altra parte, il checkpoint ti consente di salvare i pesi del tuo modello e ripristinarli senza dover salvare l'intero modello.

Qui creerai un tf.train.Checkpoint che tiene traccia del modello, che è gestito da tf.train.CheckpointManager , in modo che venga preservato solo l'ultimo checkpoint:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Una volta impostato CheckpointManager , sei pronto per salvare e rimuovere i checkpoint salvati dai non capi:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Ora, quando devi ripristinare il modello, puoi trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint . Dopo aver ripristinato il checkpoint, puoi continuare con l'allenamento.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Richiamata di backup e ripristino

Il callback tf.keras.callbacks.BackupAndRestore fornisce la funzionalità di tolleranza agli errori eseguendo il backup del modello e del numero di epoch corrente in un file di checkpoint temporaneo nell'argomento BackupAndRestore backup_dir Questo viene fatto alla fine di ogni epoca.

Una volta che i lavori vengono interrotti e riavviati, la richiamata ripristina l'ultimo checkpoint e la formazione continua dall'inizio dell'epoca interrotta. Qualsiasi addestramento parziale già fatto nell'epoca incompiuta prima dell'interruzione verrà buttato via, in modo che non influisca sullo stato finale del modello.

Per usarlo, fornisci un'istanza di tf.keras.callbacks.BackupAndRestore alla chiamata Model.fit .

Con MultiWorkerMirroredStrategy , se un lavoratore viene interrotto, l'intero cluster si interrompe fino al riavvio del lavoratore interrotto. Anche gli altri worker verranno riavviati e il worker interrotto si unirà nuovamente al cluster. Quindi, ogni lavoratore legge il file del checkpoint precedentemente salvato e riprende il suo stato precedente, consentendo così al cluster di tornare sincronizzato. Poi, la formazione continua.

La richiamata BackupAndRestore utilizza CheckpointManager per salvare e ripristinare lo stato di addestramento, che genera un file chiamato checkpoint che tiene traccia dei checkpoint esistenti insieme a quello più recente. Per questo motivo, backup_dir non dovrebbe essere riutilizzato per memorizzare altri checkpoint al fine di evitare collisioni di nomi.

Attualmente, il callback BackupAndRestore supporta la formazione per singolo lavoratore senza strategia, MirroredStrategy , e la formazione per più lavoratori con MultiWorkerMirroredStrategy .

Di seguito sono riportati due esempi sia per la formazione multi-operatore che per la formazione mono-operatore:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Se controlli la directory di backup_dir specificata in BackupAndRestore , potresti notare alcuni file di checkpoint generati temporaneamente. Questi file sono necessari per recuperare le istanze perse in precedenza e verranno rimossi dalla libreria alla fine di Model.fit al termine della formazione.

Risorse addizionali

  1. La guida Formazione distribuita in TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
  2. L' esercitazione Ciclo di formazione personalizzato con Keras e MultiWorkerMirroredStrategy mostra come usare MultiWorkerMirroredStrategy con Keras e un ciclo di formazione personalizzato.
  3. Scopri i modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
  4. La guida Prestazioni migliori con tf.function fornisce informazioni su altre strategie e strumenti, come TensorFlow Profiler che puoi utilizzare per ottimizzare le prestazioni dei tuoi modelli TensorFlow.