Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Panoramica
Questo tutorial illustra la formazione per più lavoratori con l'API del ciclo di formazione personalizzata, distribuita tramite MultiWorkerMirroredStrategy, in modo che un modello Keras progettato per essere eseguito su un singolo lavoratore possa funzionare senza problemi su più lavoratori con una modifica minima del codice.
Utilizziamo cicli di allenamento personalizzati per addestrare il nostro modello perché ci danno flessibilità e un maggiore controllo sull'allenamento. Inoltre, è più semplice eseguire il debug del modello e del ciclo di addestramento. Informazioni più dettagliate sono disponibili in Scrittura di un ciclo di formazione da zero .
Se stai cercando come utilizzare MultiWorkerMirroredStrategy
con keras model.fit
, fai invece riferimento a questo tutorial .
La guida Distributed Training in TensorFlow è disponibile per una panoramica delle strategie di distribuzione supportate da TensorFlow per coloro che sono interessati a una comprensione più approfondita delle API tf.distribute.Strategy
.
Impostare
Innanzitutto, alcune importazioni necessarie.
import json
import os
import sys
Prima di importare TensorFlow, apportare alcune modifiche all'ambiente.
Disabilita tutte le GPU. Ciò previene gli errori causati dai lavoratori che tentano tutti di utilizzare la stessa GPU. Per un'applicazione reale ogni lavoratore si troverebbe su una macchina diversa.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Reimposta la variabile di ambiente TF_CONFIG
, ne vedrai di più in seguito.
os.environ.pop('TF_CONFIG', None)
Assicurati che la directory corrente sia sul percorso di Python. Ciò consente al notebook di importare i file scritti da %%writefile
secondo momento.
if '.' not in sys.path:
sys.path.insert(0, '.')
Ora importa TensorFlow.
import tensorflow as tf
Dataset e definizione del modello
Quindi crea un file mnist.py
con un modello semplice e una configurazione del set di dati. Questo file python verrà utilizzato dai processi di lavoro in questo tutorial:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Configurazione multi-operatore
Entriamo ora nel mondo della formazione multi-lavoratore. In TensorFlow, la variabile di ambiente TF_CONFIG
è richiesta per l'addestramento su più macchine, ognuna delle quali può avere un ruolo diverso. TF_CONFIG
utilizzata di seguito è una stringa JSON utilizzata per specificare la configurazione del cluster su ogni lavoratore che fa parte del cluster. Questo è il metodo predefinito per specificare un cluster, utilizzando cluster_resolver.TFConfigClusterResolver
, ma sono disponibili altre opzioni nel modulo distribute.cluster_resolver
.
Descrivi il tuo cluster
Ecco un esempio di configurazione:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Ecco lo stesso TF_CONFIG
serializzato come stringa JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Ci sono due componenti di TF_CONFIG
: cluster
e task
.
cluster
è lo stesso per tutti i lavoratori e fornisce informazioni sul cluster di formazione, che è un dict costituito da diversi tipi di lavoro comeworker
. Nella formazione per più lavoratori conMultiWorkerMirroredStrategy
, di solito c'è unworker
che si assume un po' più di responsabilità come il salvataggio del checkpoint e la scrittura di un file di riepilogo per TensorBoard oltre a ciò che fa un normaleworker
. Tale lavoratore è indicato comechief
lavoratore ed è consuetudine che ilworker
conindex
0 sia nominato capoworker
(infatti è così che viene implementatotf.distribute.Strategy
).l'
task
fornisce informazioni sull'attività corrente ed è diversa per ogni lavoratore. Specifica iltype
e l'index
di quel lavoratore.
In questo esempio, imposti il type
di attività su "worker"
e l' index
di attività su 0
. Questa macchina è il primo lavoratore e sarà nominato capo lavoratore e farà più lavoro degli altri. Si noti che anche altre macchine dovranno avere la variabile di ambiente TF_CONFIG
impostata, e dovrebbe avere lo stesso cluster
dict, ma diverso type
di attività o index
di attività a seconda dei ruoli di quelle macchine.
A scopo illustrativo, questo tutorial mostra come è possibile impostare un TF_CONFIG
con 2 worker su localhost
. In pratica, gli utenti creerebbero più lavoratori su indirizzi/porte IP esterni e imposterebbero TF_CONFIG
su ciascun lavoratore in modo appropriato.
In questo esempio utilizzerai 2 lavoratori, il TF_CONFIG
del primo lavoratore è mostrato sopra. Per il secondo lavoratore devi impostare tf_config['task']['index']=1
Sopra, tf_config
è solo una variabile locale in python. Per utilizzarlo effettivamente per configurare l'addestramento, questo dizionario deve essere serializzato come JSON e inserito nella variabile di ambiente TF_CONFIG
.
Variabili d'ambiente e sottoprocessi nei notebook
I sottoprocessi ereditano le variabili di ambiente dal loro genitore. Quindi, se imposti una variabile di ambiente in questo processo jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
È possibile accedere alla variabile di ambiente da un sottoprocesso:
echo ${GREETINGS}
Hello TensorFlow!
Nella prossima sezione, lo utilizzerai per passare TF_CONFIG
ai sottoprocessi di lavoro. Non avvieresti mai i tuoi lavori in questo modo, ma è sufficiente per gli scopi di questo tutorial: Per dimostrare un esempio minimo di più lavoratori.
MultiWorkerMirroredStrategy
Per addestrare il modello, usa un'istanza di tf.distribute.MultiWorkerMirroredStrategy
, che crea copie di tutte le variabili nei livelli del modello su ogni dispositivo in tutti i worker. La guida tf.distribute.Strategy
contiene maggiori dettagli su questa strategia.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Utilizzare tf.distribute.Strategy.scope
per specificare che deve essere utilizzata una strategia durante la creazione del modello. Questo ti mette nel " contesto di replica incrociata " per questa strategia, il che significa che la strategia ha il controllo di cose come il posizionamento delle variabili.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Condivisione automatica dei dati tra i dipendenti
Nella formazione multi-lavoratore, lo sharding del set di dati non è necessariamente necessario, tuttavia fornisce una semantica esatta che rende più riproducibile una formazione maggiore, ad esempio la formazione su più lavoratori dovrebbe essere uguale alla formazione su un lavoratore. Nota: le prestazioni possono essere influenzate in alcuni casi.
Vedi: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Definisci il ciclo di addestramento personalizzato e addestra il modello
Specificare un ottimizzatore
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Definisci una fase di allenamento con tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Salvataggio e ripristino del checkpoint
L'implementazione del checkpoint in un ciclo di formazione personalizzato richiede che l'utente lo gestisca invece di usare un callback keras. Consente di salvare i pesi del modello e ripristinarli senza dover salvare l'intero modello.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Qui creerai un tf.train.Checkpoint
che tiene traccia del modello, che è gestito da un tf.train.CheckpointManager
in modo che venga conservato solo il checkpoint più recente.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Ora, quando devi ripristinare, puoi trovare l'ultimo checkpoint salvato utilizzando la comoda funzione tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
Dopo aver ripristinato il checkpoint, puoi continuare ad addestrare il tuo ciclo di allenamento personalizzato.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Configurazione del codice completo sui lavoratori
Per eseguire effettivamente MultiWorkerMirroredStrategy
dovrai eseguire i processi di lavoro e passare loro un TF_CONFIG
.
Come il file mnist.py
scritto in precedenza, ecco il main.py
che contiene lo stesso codice che abbiamo esaminato passo dopo passo in precedenza in questa colab, lo stiamo semplicemente scrivendo in un file in modo che ciascuno dei lavoratori lo eseguirà:
File: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Allena e valuta
La directory corrente ora contiene entrambi i file Python:
ls *.py
main.py mnist.py
Quindi json-serializza TF_CONFIG
e aggiungilo alle variabili di ambiente:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Ora puoi avviare un processo di lavoro che eseguirà main.py
e utilizzerà TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Ci sono alcune cose da notare sul comando precedente:
- Usa
%%bash
che è una "magia" del notebook per eseguire alcuni comandi bash. - Utilizza il flag
--bg
per eseguire il processobash
in background, perché questo lavoratore non verrà terminato. Aspetta tutti i lavoratori prima di iniziare.
Il processo di lavoro in background non stamperà l'output su questo notebook, quindi &>
reindirizza il suo output a un file, in modo da poter vedere cosa è successo.
Quindi, attendi qualche secondo affinché il processo si avvii:
import time
time.sleep(20)
Ora guarda cosa è stato prodotto finora nel file di registro del lavoratore:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
L'ultima riga del file di registro dovrebbe dire: Started server with target: grpc://localhost:12345
. Il primo lavoratore è ora pronto e attende che tutti gli altri lavoratori siano pronti per procedere.
Quindi aggiorna tf_config
affinché il processo del secondo lavoratore raccolga:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Ora avvia il secondo lavoratore. Questo avvierà la formazione poiché tutti i lavoratori sono attivi (quindi non è necessario eseguire il background di questo processo):
python main.py > /dev/null 2>&1
Ora se ricontrolli i log scritti dal primo lavoratore vedrai che ha partecipato all'addestramento di quel modello:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Formazione multi-lavoratore approfondita
Questo tutorial ha dimostrato un flusso di lavoro Custom Training Loop
della configurazione multi-lavoratore. Una descrizione dettagliata di altri argomenti è disponibile nella model.fit's guide
configurazione multi-lavoratore e applicabile ai CTL.
Guarda anche
- Formazione distribuita nella guida TensorFlow fornisce una panoramica delle strategie di distribuzione disponibili.
- Modelli ufficiali , molti dei quali possono essere configurati per eseguire più strategie di distribuzione.
- La sezione Prestazioni nella guida fornisce informazioni su altre strategie e strumenti che puoi utilizzare per ottimizzare le prestazioni dei tuoi modelli TensorFlow.