Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar libreta |
Descripción general
Este tutorial demuestra cómo se puede usar tf.distribute.Strategy
para la capacitación distribuida de varios trabajadores con tf.estimator
. Si escribe su código usando tf.estimator
y está interesado en escalar más allá de una sola máquina con alto rendimiento, este tutorial es para usted.
Antes de comenzar, lea la guía de estrategia de distribución . El tutorial de entrenamiento multi-GPU también es relevante, porque este tutorial usa el mismo modelo.
Configuración
Primero, configure TensorFlow y las importaciones necesarias.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
Función de entrada
Este tutorial usa el conjunto de datos MNIST de TensorFlow Datasets . El código aquí es similar al tutorial de capacitación de múltiples GPU con una diferencia clave: cuando se usa Estimator para la capacitación de varios trabajadores, es necesario fragmentar el conjunto de datos por la cantidad de trabajadores para garantizar la convergencia del modelo. Los datos de entrada se fragmentan por índice de trabajador, de modo que cada trabajador procesa 1/num_workers
porciones distintas del conjunto de datos.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
Otro enfoque razonable para lograr la convergencia sería barajar el conjunto de datos con distintas semillas en cada trabajador.
Configuración de varios trabajadores
Una de las diferencias clave en este tutorial (en comparación con el tutorial de capacitación de múltiples GPU ) es la configuración de varios trabajadores. La variable de entorno TF_CONFIG
es la forma estándar de especificar la configuración del clúster para cada trabajador que forma parte del clúster.
Hay dos componentes de TF_CONFIG
: cluster
y task
. cluster
proporciona información sobre todo el clúster, es decir, los trabajadores y los servidores de parámetros en el clúster. task
proporciona información sobre la tarea actual. El primer componente del cluster
es el mismo para todos los trabajadores y servidores de parámetros del clúster, y la task
del segundo componente es diferente en cada trabajador y servidor de parámetros y especifica su propio type
e index
. En este ejemplo, el type
de tarea es worker
y el index
la tarea es 0
.
Con fines ilustrativos, este tutorial muestra cómo configurar un TF_CONFIG
con 2 trabajadores en localhost
. En la práctica, crearía varios trabajadores en una dirección IP y un puerto externos, y establecería TF_CONFIG
en cada trabajador de forma adecuada, es decir, modificaría el index
tareas.
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
Definir el modelo
Escriba las capas, el optimizador y la función de pérdida para el entrenamiento. Este tutorial define el modelo con capas de Keras, similar al tutorial de entrenamiento multi-GPU .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredStrategy
Para entrenar el modelo, use una instancia de tf.distribute.experimental.MultiWorkerMirroredStrategy
. MultiWorkerMirroredStrategy
crea copias de todas las variables en las capas del modelo en cada dispositivo de todos los trabajadores. Utiliza CollectiveOps
, una operación de TensorFlow para la comunicación colectiva, para agregar gradientes y mantener las variables sincronizadas. La guía tf.distribute.Strategy
tiene más detalles sobre esta estrategia.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
Entrenar y evaluar el modelo.
A continuación, especifique la estrategia de distribución en RunConfig
para el estimador, y entrene y evalúe invocando tf.estimator.train_and_evaluate
. Este tutorial distribuye solo el entrenamiento especificando la estrategia a través train_distribute
. También es posible distribuir la evaluación a través eval_distribute
.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
Optimizar el rendimiento del entrenamiento
Ahora tiene un modelo y un estimador con capacidad para varios trabajadores impulsado por tf.distribute.Strategy
. Puede probar las siguientes técnicas para optimizar el rendimiento de la capacitación de varios trabajadores:
- Aumente el tamaño del lote: el tamaño del lote especificado aquí es por GPU. En general, se recomienda el tamaño de lote más grande que se ajuste a la memoria de la GPU.
- Cast variables: Cast las variables a
tf.float
si es posible. El modelo oficial de ResNet incluye un ejemplo de cómo se puede hacer esto. Utilice la comunicación colectiva:
MultiWorkerMirroredStrategy
proporciona múltiples implementaciones de comunicación colectiva .-
RING
implementa colectivos basados en anillos mediante gRPC como capa de comunicación entre hosts. -
NCCL
usa NCCL de Nvidia para implementar colectivos. -
AUTO
difiere la elección al tiempo de ejecución.
La mejor opción de implementación colectiva depende de la cantidad y el tipo de GPU y la interconexión de red en el clúster. Para anular la elección automática, especifique un valor válido para el parámetro de
communication
del constructor deMultiWorkerMirroredStrategy
, por ejemplo,communication=tf.distribute.experimental.CollectiveCommunication.NCCL
.-
Visite la sección Rendimiento de la guía para obtener más información sobre otras estrategias y herramientas que puede usar para optimizar el rendimiento de sus modelos de TensorFlow.
Otros ejemplos de código
- Ejemplo de extremo a extremo para la capacitación de varios trabajadores en tensorflow/ecosistema utilizando plantillas de Kubernetes. Este ejemplo comienza con un modelo de Keras y lo convierte en un estimador utilizando la API
tf.keras.estimator.model_to_estimator
. - Modelos oficiales , muchos de los cuales se pueden configurar para ejecutar múltiples estrategias de distribución.