Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Panoramica
L' addestramento del server dei parametri è un metodo parallelo ai dati comune per aumentare l'addestramento del modello su più macchine.
Un cluster di formazione del server dei parametri è costituito da lavoratori e server dei parametri . Le variabili vengono create sui server dei parametri e vengono lette e aggiornate dagli operatori in ogni passaggio. Per impostazione predefinita, i lavoratori leggono e aggiornano queste variabili in modo indipendente senza sincronizzarsi tra loro. Questo è il motivo per cui a volte il training in stile server dei parametri viene chiamato training asincrono .
In TensorFlow 2, l'addestramento del server dei parametri è basato sulla classe tf.distribute.experimental.ParameterServerStrategy
, che distribuisce i passaggi dell'addestramento a un cluster scalabile fino a migliaia di lavoratori (accompagnato da server dei parametri).
Metodi di formazione supportati
Esistono due principali metodi di formazione supportati:
- L'API Keras
Model.fit
, consigliata quando si preferisce un'astrazione e una gestione dell'allenamento di alto livello. - Un ciclo di formazione personalizzato (puoi fare riferimento a Formazione personalizzata , Scrittura di un ciclo di formazione da zero e Ciclo di formazione personalizzato con Keras e MultiWorkerMirroredStrategy per maggiori dettagli). Quando si preferisce definire i dettagli del ciclo di formazione, si consiglia un ciclo di formazione personalizzato.
Un cluster con lavori e attività
Indipendentemente dall'API scelta ( Model.fit
o un ciclo di formazione personalizzato), la formazione distribuita in TensorFlow 2 prevede: un 'cluster'
con diversi 'jobs'
e ciascuno dei lavori può avere uno o più 'tasks'
.
Quando si utilizza l'addestramento del server dei parametri, si consiglia di disporre di:
- Un lavoro di coordinatore (che ha il nome del lavoro
chief
) - Lavori con più lavoratori (nome del lavoro
worker
); e - Più lavori del server di parametri (nome lavoro
ps
)
Mentre il coordinatore crea risorse, invia attività di formazione, scrive checkpoint e si occupa degli errori delle attività, i lavoratori e i server dei parametri eseguono tf.distribute.Server
che ascolta le richieste del coordinatore.
Addestramento del server dei parametri con l'API Model.fit
L'addestramento del server dei parametri con l'API Model.fit
richiede che il coordinatore utilizzi un oggetto tf.distribute.experimental.ParameterServerStrategy
e un tf.keras.utils.experimental.DatasetCreator
come input. Simile all'utilizzo di Model.fit
senza strategia o con altre strategie, il flusso di lavoro prevede la creazione e la compilazione del modello, la preparazione dei callback, seguiti da una chiamata Model.fit
.
Training del server dei parametri con un ciclo di training personalizzato
Con cicli di formazione personalizzati, la classe tf.distribute.experimental.coordinator.ClusterCoordinator
è il componente chiave utilizzato per il coordinatore.
- La classe
ClusterCoordinator
deve funzionare insieme a un oggettotf.distribute.Strategy
. - Questo oggetto
tf.distribute.Strategy
è necessario per fornire le informazioni del cluster e viene utilizzato per definire un passaggio di addestramento, come illustrato in Formazione personalizzata con tf.distribute.Strategy . - L'oggetto
ClusterCoordinator
invia quindi l'esecuzione di questi passaggi di formazione ai lavoratori remoti. - Per l'addestramento del server dei parametri,
ClusterCoordinator
deve lavorare con untf.distribute.experimental.ParameterServerStrategy
.
L'API più importante fornita dall'oggetto ClusterCoordinator
è schedule
:
- L'API di
schedule
accoda unatf.function
e restituisce immediatamente unRemoteValue
simile al futuro. - Le funzioni in coda verranno inviate ai lavoratori remoti nei thread in background e i
RemoteValue
verranno riempiti in modo asincrono. - Poiché la
schedule
non richiede l'assegnazione di un lavoratore, latf.function
passata può essere eseguita su qualsiasi lavoratore disponibile. - Se il lavoratore su cui viene eseguito diventa non disponibile prima del suo completamento, la funzione verrà ritentata su un altro lavoratore disponibile.
- A causa di questo fatto e del fatto che l'esecuzione della funzione non è atomica, una funzione può essere eseguita più di una volta.
Oltre a inviare funzioni remote, ClusterCoordinator
aiuta anche a creare set di dati su tutti i lavoratori e ricostruire questi set di dati quando un lavoratore si riprende da un guasto.
Configurazione dell'esercitazione
Il tutorial si dirama in Model.fit
e percorsi di ciclo di formazione personalizzati e puoi scegliere quello che si adatta alle tue esigenze. Sezioni diverse da "Formazione con X" sono applicabili a entrambi i percorsi.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Configurazione del cluster
Come accennato in precedenza, un cluster di formazione del server dei parametri richiede un'attività del coordinatore che esegua il programma di formazione, uno o più lavoratori e attività del server dei parametri che eseguono i server TensorFlow — tf.distribute.Server
— ed eventualmente un'attività di valutazione aggiuntiva che esegua la valutazione side-car (vedi la sezione di valutazione del sidecar di seguito). I requisiti per configurarli sono:
- L'attività del coordinatore deve conoscere gli indirizzi e le porte di tutti gli altri server TensorFlow ad eccezione del valutatore.
- Gli operatori e i server dei parametri devono sapere quale porta devono ascoltare. Per motivi di semplicità, di solito è possibile trasferire le informazioni complete sul cluster durante la creazione di server TensorFlow su queste attività.
- L'attività di valutazione non deve conoscere l'impostazione del cluster di formazione. In tal caso, non dovrebbe tentare di connettersi al cluster di formazione.
- I ruoli di lavoro e i server dei parametri devono avere tipi di attività rispettivamente come
"worker"
e"ps"
. Il coordinatore dovrebbe utilizzare"chief"
come tipo di attività per motivi legacy.
In questo tutorial creerai un cluster in-process in modo che l'intero training del server dei parametri possa essere eseguito in Colab. Imparerai come impostare cluster reali in una sezione successiva.
Cluster in corso
Inizierai creando diversi server TensorFlow in anticipo e ti connetterai ad essi in un secondo momento. Nota che questo è solo per lo scopo della dimostrazione di questo tutorial e nella formazione reale i server verranno avviati su macchine "worker"
e "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
La configurazione del cluster in-process viene spesso utilizzata negli unit test, come qui .
Un'altra opzione per i test locali consiste nell'avviare i processi sul computer locale: per un esempio di questo approccio, consulta Formazione multi-lavoratore con Keras .
Istanziare una ParameterServerStrategy
Prima di approfondire il codice di addestramento, istanziare un oggetto ParameterServerStrategy
. Tieni presente che ciò è necessario indipendentemente dal fatto che tu stia procedendo con Model.fit
o con un ciclo di formazione personalizzato. L'argomento variable_partitioner
verrà spiegato nella sezione Partizionamento variabile delle variabili.
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Per utilizzare le GPU per la formazione, alloca le GPU visibili a ciascun lavoratore. ParameterServerStrategy
utilizzerà tutte le GPU disponibili su ogni lavoratore, con la limitazione che tutti i lavoratori dovrebbero avere lo stesso numero di GPU disponibili.
Frazionamento variabile
Il partizionamento orizzontale delle variabili si riferisce alla divisione di una variabile in più variabili più piccole, chiamate shard . Il partizionamento orizzontale variabile può essere utile per distribuire il carico di rete durante l'accesso a questi frammenti. È anche utile distribuire il calcolo e l'archiviazione di una variabile normale su più server di parametri.
Per abilitare il partizionamento orizzontale delle variabili, puoi passare un variable_partitioner
durante la costruzione di un oggetto ParameterServerStrategy
. Il variable_partitioner
verrà invocato ogni volta che viene creata una variabile e dovrebbe restituire il numero di shard lungo ciascuna dimensione della variabile. Vengono forniti alcuni variable_partitioner
predefiniti come tf.distribute.experimental.partitioners.MinSizePartitioner
. Si consiglia di utilizzare partizionatori basati sulle dimensioni come tf.distribute.experimental.partitioners.MinSizePartitioner
per evitare il partizionamento di piccole variabili, che potrebbero avere un impatto negativo sulla velocità di training del modello.
Quando viene passato un variable_partitioner
e se crei una variabile direttamente in strategy.scope()
, diventerà un tipo di contenitore con una proprietà delle variables
che fornisce l'accesso all'elenco degli shard. Nella maggior parte dei casi, questo contenitore verrà automaticamente convertito in un Tensor concatenando tutti gli shard. Di conseguenza, può essere utilizzata come una variabile normale. D'altra parte, alcuni metodi TensorFlow come tf.nn.embedding_lookup
forniscono un'implementazione efficiente per questo tipo di contenitore e in questi metodi verrà evitata la concatenazione automatica.
Consulta i documenti API di tf.distribute.experimental.ParameterServerStrategy
per maggiori dettagli.
Allenarsi con Model.fit
Keras fornisce un'API di addestramento di facile utilizzo tramite Model.fit
che gestisce il ciclo di addestramento sotto il cofano, con la flessibilità di train_step
e callback, che forniscono funzionalità come il salvataggio del checkpoint o il salvataggio di riepilogo per TensorBoard. Con Model.fit
, lo stesso codice di addestramento può essere utilizzato per altre strategie con un semplice scambio dell'oggetto strategia.
Dati in ingresso
Model.fit
con il training del server dei parametri richiede che i dati di input siano forniti in un callable che accetta un singolo argomento di tipo tf.distribute.InputContext
e restituisce un tf.data.Dataset
. Quindi, crea un oggetto tf.keras.utils.experimental.DatasetCreator
che accetta tale callable
e un oggetto tf.distribute.InputOptions
opzionale tramite l'argomento input_options
.
Si noti che si consiglia di mescolare e ripetere i dati con l'addestramento del server dei parametri e specificare steps_per_epoch
nella chiamata fit
in modo che la libreria conosca i limiti di epoch.
Consulta l'esercitazione sull'input distribuito per ulteriori informazioni sull'argomento InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
Il codice in dataset_fn
verrà richiamato sul dispositivo di input, che di solito è la CPU, su ciascuna delle macchine worker.
Costruzione e compilazione del modello
Ora creerai un tf.keras.Model
, un banale modello tf.keras.models.Sequential
a scopo dimostrativo, seguito da una chiamata Model.compile
per incorporare componenti, come un ottimizzatore, metriche o parametri come steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Richiami e formazione
Prima di chiamare model.fit
per la formazione vera e propria, prepariamo i callback necessari per attività comuni, come ad esempio:
-
ModelCheckpoint
: per salvare i pesi del modello. -
BackupAndRestore
: per assicurarsi che l'avanzamento della formazione venga automaticamente eseguito il backup e ripristinato se il cluster si verifica indisponibilità (come interruzione o prelazione); o -
TensorBoard
: per salvare i rapporti di avanzamento in file di riepilogo, che vengono visualizzati nello strumento TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Utilizzo diretto con ClusterCoordinator
(opzionale)
Anche se scegli il percorso di formazione Model.fit
, puoi opzionalmente istanziare un oggetto tf.distribute.experimental.coordinator.ClusterCoordinator
per programmare altre funzioni che vorresti fossero eseguite sui lavoratori. Per ulteriori dettagli ed esempi, vedere la sezione Formazione con un ciclo di formazione personalizzato .
Allenamento con un ciclo di allenamento personalizzato
L'utilizzo di cicli di formazione personalizzati con tf.distribute.Strategy
offre una grande flessibilità per definire i cicli di formazione. Con la ParameterServerStrategy
definita sopra (come strategy
), utilizzerai un tf.distribute.experimental.coordinator.ClusterCoordinator
per inviare l'esecuzione dei passaggi di formazione ai lavoratori remoti.
Quindi, creerai un modello, definirai un set di dati e una funzione di passaggio, come hai fatto nel ciclo di formazione con altri tf.distribute.Strategy
s. Puoi trovare maggiori dettagli nel tutorial Formazione personalizzata con tf.distribute.Strategy .
Per garantire un precaricamento efficiente dei set di dati, utilizzare le API di creazione di set di dati distribuite consigliate menzionate nella sezione Invio dei passaggi di formazione ai lavoratori remoti di seguito. Inoltre, assicurati di chiamare Strategy.run
all'interno di worker_fn
per sfruttare appieno le GPU assegnate ai lavoratori. Il resto dei passaggi è lo stesso per l'allenamento con o senza GPU.
Creiamo questi componenti nei seguenti passaggi:
Imposta i dati
Innanzitutto, scrivi una funzione che crei un set di dati che includa la logica di preelaborazione implementata dai livelli di preelaborazione di Keras .
Creerai questi livelli al di fuori del dataset_fn
ma applicherai la trasformazione all'interno del dataset_fn
, poiché avvolgerai il dataset_fn
in un tf.function
, che non consente la creazione di variabili al suo interno.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Genera esempi di giocattoli in un set di dati:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Quindi, crea il set di dati di addestramento racchiuso in un dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Costruisci il modello
Quindi, crea il modello e altri oggetti. Assicurati di creare tutte le variabili in strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Confermiamo che l'uso di FixedShardsPartitioner
diviso tutte le variabili in due shard e ogni shard è stato assegnato a diversi server di parametri:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Definisci la fase di formazione
Terzo, crea la fase di formazione racchiusa in una tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
Nella funzione del passaggio di addestramento sopra, la chiamata di Strategy.run
e Strategy.reduce
in step_fn
può supportare più GPU per lavoratore. Se ai lavoratori sono assegnate GPU, Strategy.run
distribuirà i set di dati su più repliche.
Inviare le fasi di formazione ai lavoratori remoti
Dopo che tutti i calcoli sono stati definiti da ParameterServerStrategy
, utilizzerai la classe tf.distribute.experimental.coordinator.ClusterCoordinator
per creare risorse e distribuire i passaggi di formazione ai lavoratori remoti.
Creiamo prima un oggetto ClusterCoordinator
e passiamo l'oggetto strategia:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Quindi, crea un set di dati per lavoratore e un iteratore. In per_worker_dataset_fn
seguito, si consiglia di eseguire il wrapping di dataset_fn
in strategy.distribute_datasets_from_function
per consentire un precaricamento efficiente alle GPU senza interruzioni.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Il passaggio finale consiste nel distribuire il calcolo ai lavoratori remoti utilizzando ClusterCoordinator.schedule
:
- Il metodo di
schedule
accoda unatf.function
e restituisce immediatamente unRemoteValue
simile al futuro. Le funzioni in coda verranno inviate ai lavoratori remoti nei thread in background eRemoteValue
verrà compilato in modo asincrono. - Il metodo
join
(ClusterCoordinator.join
) può essere utilizzato per attendere l'esecuzione di tutte le funzioni pianificate.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Ecco come recuperare il risultato di un RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
In alternativa, puoi avviare tutti i passaggi e fare qualcosa in attesa del completamento:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Per il flusso di lavoro completo di formazione e servizio per questo particolare esempio, dai un'occhiata a questo test .
Ulteriori informazioni sulla creazione di set di dati
Il set di dati nel codice precedente viene creato utilizzando l'API ClusterCoordinator.create_per_worker_dataset
). Crea un set di dati per lavoratore e restituisce un oggetto contenitore. Puoi chiamare il metodo iter
su di esso per creare un iteratore per lavoratore. L'iteratore per lavoratore contiene un iteratore per lavoratore e la sezione corrispondente di un lavoratore verrà sostituita nell'argomento di input della funzione passata al metodo ClusterCoordinator.schedule
prima che la funzione venga eseguita su un particolare lavoratore.
Attualmente, il metodo ClusterCoordinator.schedule
presuppone che i lavoratori siano equivalenti e quindi presuppone che i set di dati su lavoratori diversi siano gli stessi, tranne per il fatto che potrebbero essere mescolati in modo diverso se contengono un'operazione Dataset.shuffle
. Per questo motivo, si consiglia inoltre di ripetere i set di dati all'infinito e di pianificare un numero limitato di passaggi invece di fare affidamento su OutOfRangeError
da un set di dati.
Un'altra nota importante è che i set di dati tf.data
non supportano la serializzazione e la deserializzazione implicite attraverso i limiti delle attività. Quindi è importante creare l'intero set di dati all'interno della funzione passata a ClusterCoordinator.create_per_worker_dataset
.
Valutazione
Esiste più di un modo per definire ed eseguire un ciclo di valutazione nella formazione distribuita. Ognuno ha i suoi pro e contro, come descritto di seguito. Il metodo di valutazione in linea è consigliato se non hai una preferenza.
Valutazione in linea
In questo metodo, il coordinatore alterna formazione e valutazione e quindi viene chiamato valutazione in linea .
Ci sono diversi vantaggi della valutazione in linea. Per esempio:
- Può supportare grandi modelli di valutazione e set di dati di valutazione che una singola attività non può contenere.
- I risultati della valutazione possono essere utilizzati per prendere decisioni per l'allenamento dell'epoca successiva.
Esistono due modi per implementare la valutazione in linea: valutazione diretta e valutazione distribuita.
- Valutazione diretta : per piccoli modelli e set di dati di valutazione, il coordinatore può eseguire la valutazione direttamente sul modello distribuito con il set di dati di valutazione sul coordinatore:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Valutazione distribuita : per modelli o set di dati di grandi dimensioni che non è possibile eseguire direttamente sul coordinatore, l'attività del coordinatore può distribuire compiti di valutazione ai lavoratori tramite i metodi
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Valutazione sidecar
Un altro metodo è chiamato valutazione side-car in cui si crea un'attività di valutazione dedicata che legge ripetutamente i checkpoint ed esegue la valutazione su un checkpoint più recente. Consente al programma di allenamento di terminare in anticipo se non è necessario modificare il ciclo di allenamento in base ai risultati della valutazione. Tuttavia, richiede un'attività di valutazione aggiuntiva e un checkpoint periodico per attivare la valutazione. Di seguito è riportato un possibile ciclo di valutazione del sidecar:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Cluster nel mondo reale
In un ambiente di produzione reale, eseguirai tutte le attività in processi diversi su macchine diverse. Il modo più semplice per configurare le informazioni del cluster su ciascuna attività consiste nell'impostare le variabili di ambiente "TF_CONFIG"
e utilizzare un tf.distribute.cluster_resolver.TFConfigClusterResolver
per analizzare "TF_CONFIG"
.
Per una descrizione generale delle variabili di ambiente "TF_CONFIG"
, fare riferimento alla Guida alla formazione distribuita .
Se inizi le tue attività di formazione utilizzando Kubernetes o altri modelli di configurazione, è molto probabile che questi modelli abbiano già impostato “TF_CONFIG"
per te.
Impostare la variabile di ambiente "TF_CONFIG"
.
Supponiamo di avere 3 lavoratori e 2 server dei parametri, il "TF_CONFIG"
del lavoratore 1 può essere:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
Il "TF_CONFIG"
del valutatore può essere:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
La parte "cluster"
nella stringa "TF_CONFIG"
sopra per il valutatore è facoltativa.
Se usi lo stesso binario per tutte le attività
Se preferisci eseguire tutte queste attività utilizzando un singolo binario, all'inizio dovrai lasciare che il tuo programma si ramifichi in ruoli diversi:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
Il codice seguente avvia un server TensorFlow e attende:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Gestione del fallimento dell'attività
Fallimento del lavoratore
tf.distribute.experimental.coordinator.ClusterCoordinator
o Model.fit
forniscono una tolleranza agli errori incorporata per il fallimento del lavoratore. Al momento del ripristino del lavoratore, la funzione del set di dati fornita in precedenza (a ClusterCoordinator.create_per_worker_dataset
per un ciclo di addestramento personalizzato o tf.keras.utils.experimental.DatasetCreator
per Model.fit
) verrà richiamata sui lavoratori per ricreare i set di dati.
Errore del server dei parametri o del coordinatore
Tuttavia, quando il coordinatore vede un errore del server dei parametri, solleverà immediatamente un UnavailableError
o AbortedError
. In questo caso puoi riavviare il coordinatore. Anche il coordinatore stesso può diventare non disponibile. Pertanto, si consiglia di utilizzare alcuni strumenti per non perdere i progressi della formazione:
Per
Model.fit
, dovresti usare una richiamataBackupAndRestore
, che gestisce automaticamente il salvataggio e il ripristino dell'avanzamento. Per un esempio, vedere la sezione Richiamate e formazione sopra.Per un ciclo di addestramento personalizzato, è necessario controllare periodicamente le variabili del modello e caricare le variabili del modello da un punto di controllo, se presente, prima dell'inizio dell'addestramento. L'avanzamento dell'addestramento può essere dedotto approssimativamente
optimizer.iterations
se viene eseguito un checkpoint di un ottimizzatore:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Recupero di un RemoteValue
Il recupero di un RemoteValue
è garantito se una funzione viene eseguita correttamente. Questo perché attualmente il valore restituito viene immediatamente copiato nel coordinatore dopo l'esecuzione di una funzione. Se si verifica un errore di lavoro durante la copia, la funzione verrà ritentata su un altro lavoratore disponibile. Pertanto, se si desidera ottimizzare le prestazioni, è possibile pianificare le funzioni senza un valore restituito.
Segnalazione errori
Una volta che il coordinatore vede un errore come UnavailableError
dai server dei parametri o altri errori dell'applicazione come InvalidArgument
da tf.debugging.check_numerics
, cancellerà tutte le funzioni in sospeso e in coda prima di generare l'errore. Recuperare i loro RemoteValue
corrispondenti solleverà un CancelledError
.
Dopo che è stato segnalato un errore, il coordinatore non solleverà lo stesso errore o qualsiasi errore dalle funzioni annullate.
Miglioramento delle prestazioni
Esistono diversi possibili motivi se si verificano problemi di prestazioni durante l'allenamento con ParameterServerStrategy
e ClusterResolver
.
Un motivo comune è che i server dei parametri hanno un carico sbilanciato e alcuni server dei parametri molto caricati hanno raggiunto la capacità. Possono esserci anche più cause alla radice. Alcuni semplici metodi per mitigare questo problema sono:
- Shard le tue variabili di modello di grandi dimensioni specificando un
variable_partitioner
durante la costruzione di unParameterServerStrategy
. - Se possibile, evitare di creare una variabile hotspot richiesta da tutti i server dei parametri in un unico passaggio. Ad esempio, utilizzare un tasso di apprendimento costante o una sottoclasse
tf.keras.optimizers.schedules.LearningRateSchedule
negli ottimizzatori poiché il comportamento predefinito è che il tasso di apprendimento diventerà una variabile posizionata su un particolare server di parametri e richiesta da tutti gli altri server di parametri in ogni passaggio . - Mescola i tuoi grandi vocabolari prima di passarli ai livelli di preelaborazione di Keras.
Un'altra possibile ragione per problemi di prestazioni è il coordinatore. La tua prima implementazione di schedule
/ join
è basata su Python e quindi potrebbe avere un sovraccarico di threading. Anche la latenza tra il coordinatore e i lavoratori può essere grande. Se questo è il caso,
Per
Model.fit
, puoi impostare l'argomentosteps_per_execution
fornito inModel.compile
su un valore maggiore di 1.Per un ciclo di formazione personalizzato, puoi raggruppare più passaggi in un unico
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Poiché la libreria è ulteriormente ottimizzata, si spera che la maggior parte degli utenti non debba impacchettare manualmente i passaggi in futuro.
Inoltre, un piccolo trucco per il miglioramento delle prestazioni consiste nel pianificare le funzioni senza un valore restituito, come spiegato nella sezione precedente relativa alla gestione degli errori dell'attività.
Limiti noti
La maggior parte delle limitazioni note sono già trattate nelle sezioni precedenti. Questa sezione fornisce un riepilogo.
ParameterServerStrategy
generale
-
os.environment["grpc_fail_fast"]="use_caller"
è necessario in ogni attività, incluso il coordinatore, per far funzionare correttamente la tolleranza agli errori. - L'addestramento sincrono del server dei parametri non è supportato.
- Di solito è necessario raggruppare più passaggi in un'unica funzione per ottenere prestazioni ottimali.
- Non è supportato caricare un modello_salvato tramite
tf.saved_model.load
contenente variabili partizionate. Nota che il caricamento di un tale modello_salvato utilizzando TensorFlow Serving dovrebbe funzionare. - Non è supportato caricare un checkpoint contenente variabili di slot dell'ottimizzatore partizionato in un numero diverso di partizioni.
- Non è supportato il ripristino da un errore del server dei parametri senza riavviare l'attività del coordinatore.
- L'utilizzo di
tf.lookup.StaticHashTable
(che è comunemente impiegato da alcuni livelli di preelaborazione Keras, cometf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
etf.keras.layers.TextVectorization
) determina risorse posizionate su il coordinatore in questo momento con la formazione del server dei parametri. Ciò ha implicazioni sulle prestazioni per la ricerca degli RPC dai lavoratori al coordinatore. Questa è un'alta priorità attuale da affrontare.
Specifiche Model.fit
- L'argomento
steps_per_epoch
è richiesto inModel.fit
. È possibile selezionare un valore che fornisca intervalli appropriati in un'epoca. -
ParameterServerStrategy
non supporta i callback personalizzati con chiamate a livello di batch per motivi di prestazioni. Dovresti convertire quelle chiamate in chiamate a livello di epoca consteps_per_epoch
opportunamente selezionati, in modo che siano chiamate ogni numero disteps_per_epoch
di passi. I callback incorporati non sono interessati: le loro chiamate a livello di batch sono state modificate per essere performanti. È in programma il supporto delle chiamate a livello di batch perParameterServerStrategy
. - Per lo stesso motivo, a differenza di altre strategie, la barra di avanzamento e le metriche vengono registrate solo ai limiti dell'epoca.
-
run_eagerly
non è supportato.
Specifiche del ciclo di formazione personalizzato
-
ClusterCoordinator.schedule
non supporta le garanzie di visita per un set di dati. - Quando viene utilizzato
ClusterCoordinator.create_per_worker_dataset
, l'intero set di dati deve essere creato all'interno della funzione passata ad esso. -
tf.data.Options
viene ignorato in un set di dati creato daClusterCoordinator.create_per_worker_dataset
.