Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar libreta |
Descripción general
Este tutorial demuestra la capacitación de varios trabajadores con una API de ciclo de capacitación personalizada, distribuida a través de MultiWorkerMirroredStrategy, por lo que un modelo de Keras diseñado para ejecutarse en un solo trabajador puede funcionar sin problemas en varios trabajadores con un cambio de código mínimo.
Estamos utilizando bucles de entrenamiento personalizados para entrenar nuestro modelo porque nos brindan flexibilidad y un mayor control sobre el entrenamiento. Además, es más fácil depurar el modelo y el ciclo de entrenamiento. Hay información más detallada disponible en Escritura de un ciclo de entrenamiento desde cero .
Si está buscando cómo usar MultiWorkerMirroredStrategy
con keras model.fit
, consulte este tutorial .
La guía Capacitación distribuida en TensorFlow está disponible para obtener una descripción general de las estrategias de distribución que admite TensorFlow para aquellos interesados en una comprensión más profunda de las API de tf.distribute.Strategy
.
Configuración
Primero, algunas importaciones necesarias.
import json
import os
import sys
Antes de importar TensorFlow, realice algunos cambios en el entorno.
Deshabilite todas las GPU. Esto evita errores causados por los trabajadores que intentan usar la misma GPU. Para una aplicación real, cada trabajador estaría en una máquina diferente.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Restablezca la variable de entorno TF_CONFIG
, verá más sobre esto más adelante.
os.environ.pop('TF_CONFIG', None)
Asegúrese de que el directorio actual esté en la ruta de python. Esto permite que el portátil importe los archivos escritos por %%writefile
más tarde.
if '.' not in sys.path:
sys.path.insert(0, '.')
Ahora importe TensorFlow.
import tensorflow as tf
Definición de conjunto de datos y modelo
A continuación, cree un archivo mnist.py
con un modelo simple y una configuración de conjunto de datos. Este archivo python será utilizado por los procesos de trabajo en este tutorial:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Configuración de varios trabajadores
Ahora entremos en el mundo de la formación multitrabajador. En TensorFlow, la variable de entorno TF_CONFIG
es necesaria para el entrenamiento en varias máquinas, cada una de las cuales posiblemente tenga una función diferente. TF_CONFIG
, que se usa a continuación, es una cadena JSON que se usa para especificar la configuración del clúster en cada trabajador que forma parte del clúster. Este es el método predeterminado para especificar un clúster, utilizando cluster_resolver.TFConfigClusterResolver
, pero hay otras opciones disponibles en el módulo distribute.cluster_resolver
.
Describa su clúster
Aquí hay una configuración de ejemplo:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Aquí está el mismo TF_CONFIG
serializado como una cadena JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Hay dos componentes de TF_CONFIG
: cluster
y task
.
cluster
es el mismo para todos los trabajadores y brinda información sobre el clúster de capacitación, que es un dict que consta de diferentes tipos de trabajos, comoworker
. En la capacitación de varios trabajadores conMultiWorkerMirroredStrategy
, generalmente hay unworker
que asume un poco más de responsabilidad, como guardar el punto de control y escribir un archivo de resumen para TensorBoard, además de lo que hace unworker
normal. Dicho trabajador se denomina trabajadorchief
, y es costumbre que elworker
conindex
0 sea designado comoworker
jefe (de hecho, así es como se implementatf.distribute.Strategy
).task
proporciona información de la tarea actual y es diferente en cada trabajador. Especifica eltype
y elindex
de ese trabajador.
En este ejemplo, establece el type
de tarea en "worker"
y el index
de la tarea en 0
. Esta máquina es el primer trabajador y será designado como el trabajador principal y hará más trabajo que los demás. Tenga en cuenta que otras máquinas necesitarán tener configurada la variable de entorno TF_CONFIG
también, y debería tener el mismo dict de cluster
, pero diferente type
de tarea o index
de tarea dependiendo de cuáles sean las funciones de esas máquinas.
Con fines ilustrativos, este tutorial muestra cómo se puede configurar un TF_CONFIG
con 2 trabajadores en localhost
. En la práctica, los usuarios crearían varios trabajadores en puertos/direcciones IP externas y establecerían TF_CONFIG
en cada trabajador de forma adecuada.
En este ejemplo, utilizará 2 trabajadores, el TF_CONFIG
del primer trabajador se muestra arriba. Para el segundo trabajador, establecería tf_config['task']['index']=1
Arriba, tf_config
es solo una variable local en python. Para usarlo realmente para configurar el entrenamiento, este diccionario debe serializarse como JSON y colocarse en la variable de entorno TF_CONFIG
.
Variables de entorno y subprocesos en cuadernos
Los subprocesos heredan variables de entorno de su padre. Entonces, si configura una variable de entorno en este proceso de jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Puede acceder a la variable de entorno desde un subproceso:
echo ${GREETINGS}
Hello TensorFlow!
En la siguiente sección, usará esto para pasar TF_CONFIG
a los subprocesos de trabajo. Realmente nunca lanzaría sus trabajos de esta manera, pero es suficiente para los propósitos de este tutorial: para demostrar un ejemplo mínimo de varios trabajadores.
MultiWorkerMirroredStrategy
Para entrenar el modelo, use una instancia de tf.distribute.MultiWorkerMirroredStrategy
, que crea copias de todas las variables en las capas del modelo en cada dispositivo en todos los trabajadores. La guía tf.distribute.Strategy
tiene más detalles sobre esta estrategia.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Use tf.distribute.Strategy.scope
para especificar que se debe usar una estrategia al construir su modelo. Esto lo coloca en el " contexto de réplica cruzada " para esta estrategia, lo que significa que la estrategia tiene el control de cosas como la ubicación de variables.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Fragmentación automática de sus datos entre trabajadores
En el entrenamiento de varios trabajadores, la fragmentación del conjunto de datos no es necesariamente necesaria; sin embargo, le brinda una semántica exactamente una vez que hace que más entrenamiento sea más reproducible, es decir, el entrenamiento en varios trabajadores debe ser lo mismo que el entrenamiento en un trabajador. Nota: el rendimiento puede verse afectado en algunos casos.
Ver: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Definir bucle de entrenamiento personalizado y entrenar el modelo
Especificar un optimizador
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Definir un paso de entrenamiento con tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Guardar y restaurar puntos de control
La implementación de puntos de control en un ciclo de entrenamiento personalizado requiere que el usuario lo maneje en lugar de usar una devolución de llamada de keras. Le permite guardar los pesos del modelo y restaurarlos sin tener que guardar todo el modelo.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Aquí, creará un tf.train.Checkpoint
que rastrea el modelo, que es administrado por un tf.train.CheckpointManager
para que solo se conserve el último punto de control.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Ahora, cuando necesite restaurar, puede encontrar el último punto de control guardado usando la conveniente función tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
Después de restaurar el punto de control, puede continuar entrenando su bucle de entrenamiento personalizado.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Configuración de código completo en trabajadores
Para ejecutar realmente con MultiWorkerMirroredStrategy
, deberá ejecutar procesos de trabajo y pasarles un TF_CONFIG
.
Al igual que el archivo mnist.py
escrito anteriormente, aquí está el main.py
que contiene el mismo código que vimos paso a paso anteriormente en esta colaboración, solo lo estamos escribiendo en un archivo para que cada uno de los trabajadores lo ejecute:
Archivo: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Capacitar y Evaluar
El directorio actual ahora contiene ambos archivos de Python:
ls *.py
main.py mnist.py
Así que json serialice el TF_CONFIG
y agréguelo a las variables de entorno:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Ahora, puede iniciar un proceso de trabajo que ejecutará main.py
y usará TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Hay algunas cosas a tener en cuenta sobre el comando anterior:
- Utiliza
%%bash
, que es un cuaderno "mágico" para ejecutar algunos comandos de bash. - Utiliza el indicador
--bg
para ejecutar el procesobash
en segundo plano, porque este trabajador no terminará. Espera a todos los trabajadores antes de comenzar.
El proceso de trabajo en segundo plano no imprimirá la salida en este cuaderno, por lo que &>
redirige su salida a un archivo, para que pueda ver lo que sucedió.
Entonces, espere unos segundos para que el proceso se inicie:
import time
time.sleep(20)
Ahora mire lo que se ha enviado al archivo de registro del trabajador hasta ahora:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
La última línea del archivo de registro debería decir: Started server with target: grpc://localhost:12345
. El primer trabajador ahora está listo y está esperando que todos los demás trabajadores estén listos para continuar.
Así que actualice tf_config
para que el proceso del segundo trabajador lo recoja:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Ahora lanza el segundo trabajador. Esto iniciará la capacitación ya que todos los trabajadores están activos (por lo que no es necesario poner en segundo plano este proceso):
python main.py > /dev/null 2>&1
Ahora, si vuelve a verificar los registros escritos por el primer trabajador, verá que participó en el entrenamiento de ese modelo:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Formación multitrabajador en profundidad
Este tutorial ha demostrado un flujo de trabajo de Custom Training Loop
de la configuración de varios trabajadores. Una descripción detallada de otros temas está disponible en la model.fit's guide
de la configuración de varios trabajadores y aplicable a los CTL.
Ver también
- La guía Capacitación distribuida en TensorFlow brinda una descripción general de las estrategias de distribución disponibles.
- Modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.
- La sección Rendimiento de la guía brinda información sobre otras estrategias y herramientas que puede usar para optimizar el rendimiento de sus modelos de TensorFlow.