Voir sur TensorFlow.org | Exécuter dans Google Colab | Voir la source sur GitHub | Télécharger le cahier |
Aperçu
La formation de serveur de paramètres est une méthode commune de données parallèles pour mettre à l'échelle la formation de modèles sur plusieurs machines.
Un cluster de formation de serveur de paramètres se compose de nœuds de calcul et de serveurs de paramètres . Les variables sont créées sur les serveurs de paramètres et elles sont lues et mises à jour par les travailleurs à chaque étape. Par défaut, les agents lisent et mettent à jour ces variables indépendamment sans se synchroniser les uns avec les autres. C'est pourquoi la formation de type serveur de paramètres est parfois appelée formation asynchrone .
Dans TensorFlow 2, la formation des serveurs de paramètres est alimentée par la classe tf.distribute.experimental.ParameterServerStrategy
, qui distribue les étapes de formation à un cluster qui évolue jusqu'à des milliers de nœuds de calcul (accompagnés de serveurs de paramètres).
Méthodes de formation prises en charge
Il existe deux principales méthodes de formation prises en charge :
- L'API Keras
Model.fit
, qui est recommandée lorsque vous préférez une abstraction et une gestion de haut niveau de la formation. - Une boucle de formation personnalisée (vous pouvez vous référer à Formation personnalisée , Rédaction d'une boucle de formation à partir de zéro et Boucle de formation personnalisée avec Keras et MultiWorkerMirroredStrategy pour plus de détails.) La formation en boucle personnalisée est recommandée lorsque vous préférez définir les détails de leur boucle de formation.
Un cluster avec des jobs et des tâches
Quelle que soit l'API choisie ( Model.fit
ou une boucle d'entraînement personnalisée), l'entraînement distribué dans TensorFlow 2 implique : un 'cluster'
avec plusieurs 'jobs'
, et chacune des tâches peut avoir une ou plusieurs 'tasks'
.
Lors de l'utilisation de la formation du serveur de paramètres, il est recommandé d'avoir :
- Un poste de coordinateur (qui porte le nom de poste
chief
) - Plusieurs tâches de travail (nom de la tâche
worker
) ; et - Travaux de serveur à paramètres multiples (nom du travail
ps
)
Pendant que le coordinateur crée des ressources, distribue des tâches de formation, écrit des points de contrôle et traite les échecs de tâches, les travailleurs et les serveurs de paramètres exécutent tf.distribute.Server
qui écoute les demandes du coordinateur.
Formation du serveur de paramètres avec l'API Model.fit
La formation du serveur de paramètres avec l'API Model.fit
nécessite que le coordinateur utilise un objet tf.distribute.experimental.ParameterServerStrategy
et un tf.keras.utils.experimental.DatasetCreator
comme entrée. Semblable à l'utilisation de Model.fit
sans stratégie, ou avec d'autres stratégies, le flux de travail implique la création et la compilation du modèle, la préparation des rappels, suivis d'un appel Model.fit
.
Entraînement du serveur de paramètres avec une boucle d'entraînement personnalisée
Avec les boucles de formation personnalisées, la classe tf.distribute.experimental.coordinator.ClusterCoordinator
est le composant clé utilisé pour le coordinateur.
- La classe
ClusterCoordinator
doit fonctionner conjointement avec un objettf.distribute.Strategy
. - Cet objet
tf.distribute.Strategy
est nécessaire pour fournir les informations du cluster et est utilisé pour définir une étape de formation, comme illustré dans Formation personnalisée avec tf.distribute.Strategy . - L'objet
ClusterCoordinator
distribue ensuite l'exécution de ces étapes de formation aux travailleurs distants. - Pour la formation du serveur de paramètres, le
ClusterCoordinator
doit travailler avec untf.distribute.experimental.ParameterServerStrategy
.
L'API la plus importante fournie par l'objet ClusterCoordinator
est schedule
:
- L'API de
schedule
met en file d'attente une fonction tf. et renvoie immédiatement unetf.function
de typeRemoteValue
. - Les fonctions en file d'attente seront distribuées aux travailleurs distants dans les threads d'arrière-plan et leurs
RemoteValue
seront remplies de manière asynchrone. - Étant donné que
schedule
ne nécessite pas d'affectation de travailleur, latf.function
transmise peut être exécutée sur n'importe quel travailleur disponible. - Si le worker sur lequel elle est exécutée devient indisponible avant son achèvement, la fonction sera réessayée sur un autre worker disponible.
- De ce fait et du fait que l'exécution d'une fonction n'est pas atomique, une fonction peut être exécutée plus d'une fois.
En plus de répartir les fonctions à distance, le ClusterCoordinator
aide également à créer des ensembles de données sur tous les travailleurs et à reconstruire ces ensembles de données lorsqu'un travailleur récupère d'un échec.
Configuration du didacticiel
Le didacticiel se ramifiera dans Model.fit
et les chemins de boucle de formation personnalisés, et vous pourrez choisir celui qui correspond à vos besoins. Les sections autres que "S'entraîner avec X" s'appliquent aux deux parcours.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Configuration du cluster
Comme mentionné ci-dessus, un cluster de formation de serveur de paramètres nécessite une tâche de coordinateur qui exécute votre programme de formation, un ou plusieurs nœuds de calcul et des tâches de serveur de paramètres qui exécutent des serveurs TensorFlow ( tf.distribute.Server
et éventuellement une tâche d'évaluation supplémentaire qui exécute une évaluation parallèle. (voir la section d'évaluation du side-car ci-dessous). Les conditions requises pour les mettre en place sont les suivantes :
- La tâche de coordinateur doit connaître les adresses et les ports de tous les autres serveurs TensorFlow, à l'exception de l'évaluateur.
- Les nœuds de calcul et les serveurs de paramètres doivent savoir sur quel port ils doivent écouter. Par souci de simplicité, vous pouvez généralement transmettre les informations complètes du cluster lors de la création de serveurs TensorFlow sur ces tâches.
- La tâche de l'évaluateur n'a pas besoin de connaître la configuration du cluster de formation. Si tel est le cas, il ne doit pas tenter de se connecter au cluster de formation.
- Les travailleurs et les serveurs de paramètres doivent avoir des types de tâche comme
"worker"
et"ps"
, respectivement. Le coordinateur doit utiliser"chief"
comme type de tâche pour des raisons d'héritage.
Dans ce didacticiel, vous allez créer un cluster in-process afin que l'ensemble de la formation du serveur de paramètres puisse être exécuté dans Colab. Vous apprendrez à configurer de vrais clusters dans une section ultérieure.
Cluster en cours
Vous commencerez par créer plusieurs serveurs TensorFlow à l'avance et vous vous y connecterez ultérieurement. Notez que ce n'est que dans le but de la démonstration de ce tutoriel, et dans le cadre d'une formation réelle, les serveurs seront démarrés sur des machines "worker"
et "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
La configuration du cluster in-process est fréquemment utilisée dans les tests unitaires, comme ici .
Une autre option pour les tests locaux consiste à lancer des processus sur la machine locale. Consultez Formation multi-travailleurs avec Keras pour un exemple de cette approche.
Instancier une ParameterServerStrategy
Avant de vous plonger dans le code de formation, instancions un objet ParameterServerStrategy
. Notez que cela est nécessaire, que vous procédiez avec Model.fit
ou une boucle d'entraînement personnalisée. L'argument variable_partitioner
sera expliqué dans la section Variable sharding .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Afin d'utiliser des GPU pour la formation, allouez des GPU visibles à chaque travailleur. ParameterServerStrategy
utilisera tous les GPU disponibles sur chaque travailleur, avec la restriction que tous les travailleurs doivent avoir le même nombre de GPU disponibles.
Partage variable
Le sharding variable fait référence au fractionnement d'une variable en plusieurs variables plus petites, appelées shards . Le partitionnement variable peut être utile pour répartir la charge du réseau lors de l'accès à ces partitions. Il est également utile de répartir le calcul et le stockage d'une variable normale sur plusieurs serveurs de paramètres.
Pour activer le partitionnement variable, vous pouvez transmettre un variable_partitioner
lors de la construction d'un objet ParameterServerStrategy
. Le variable_partitioner
sera appelé chaque fois qu'une variable est créée et il est prévu de renvoyer le nombre de fragments le long de chaque dimension de la variable. Certains variable_partitioner
prêts à l'emploi sont fournis, tels que tf.distribute.experimental.partitioners.MinSizePartitioner
. Il est recommandé d'utiliser des partitionneurs basés sur la taille comme tf.distribute.experimental.partitioners.MinSizePartitioner
pour éviter de partitionner de petites variables, ce qui pourrait avoir un impact négatif sur la vitesse de formation du modèle.
Lorsqu'un variable_partitioner
est passé et si vous créez une variable directement sous strategy.scope()
, il deviendra un type de conteneur avec une propriété variables
qui donne accès à la liste des fragments. Dans la plupart des cas, ce conteneur sera automatiquement converti en Tensor en concaténant tous les fragments. Par conséquent, il peut être utilisé comme une variable normale. D'autre part, certaines méthodes TensorFlow telles que tf.nn.embedding_lookup
fournissent une implémentation efficace pour ce type de conteneur et dans ces méthodes, la concaténation automatique sera évitée.
Veuillez consulter la documentation de l'API de tf.distribute.experimental.ParameterServerStrategy
pour plus de détails.
Entraînement avec Model.fit
Keras fournit une API de formation facile à utiliser via Model.fit
qui gère la boucle de formation sous le capot, avec la flexibilité de train_step
et des rappels, qui fournissent des fonctionnalités telles que l'enregistrement de points de contrôle ou l'enregistrement de résumé pour TensorBoard. Avec Model.fit
, le même code d'entraînement peut être utilisé pour d'autres stratégies avec un simple échange de l'objet de stratégie.
Des données d'entrée
Model.fit
avec la formation du serveur de paramètres nécessite que les données d'entrée soient fournies dans un appelable qui prend un seul argument de type tf.distribute.InputContext
et renvoie un tf.data.Dataset
. Ensuite, créez un objet tf.keras.utils.experimental.DatasetCreator
qui prend un tel callable
et un objet tf.distribute.InputOptions
facultatif via l'argument input_options
.
Notez qu'il est recommandé de mélanger et de répéter les données avec la formation du serveur de paramètres et de spécifier steps_per_epoch
dans l'appel fit
afin que la bibliothèque connaisse les limites de l'époque.
Veuillez consulter le didacticiel sur l' entrée distribuée pour plus d'informations sur l'argument InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
Le code dans dataset_fn
sera appelé sur le périphérique d'entrée, qui est généralement le processeur, sur chacune des machines de travail.
Construction et compilation de modèles
Maintenant, vous allez créer un tf.keras.Model
- un modèle trivial tf.keras.models.Sequential
à des fins de démonstration - suivi d'un appel Model.compile
pour incorporer des composants, tels qu'un optimiseur, des métriques ou des paramètres tels que steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Rappels et formation
Avant d'appeler model.fit
pour la formation proprement dite, préparons les rappels nécessaires pour les tâches courantes, telles que :
-
ModelCheckpoint
: pour enregistrer les poids du modèle. -
BackupAndRestore
: pour s'assurer que la progression de la formation est automatiquement sauvegardée et récupérée en cas d'indisponibilité du cluster (telle qu'un abandon ou une préemption) ; ou -
TensorBoard
: pour enregistrer les rapports d'avancement dans des fichiers de synthèse, qui sont visualisés dans l'outil TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Utilisation directe avec ClusterCoordinator
(facultatif)
Même si vous choisissez le parcours de formation Model.fit
, vous pouvez éventuellement instancier un objet tf.distribute.experimental.coordinator.ClusterCoordinator
pour planifier d'autres fonctions que vous souhaitez exécuter sur les nœuds de calcul. Voir la section Entraînement avec une boucle d'entraînement personnalisée pour plus de détails et d'exemples.
Entraînement avec une boucle d'entraînement personnalisée
L'utilisation de boucles d'entraînement personnalisées avec tf.distribute.Strategy
offre une grande flexibilité pour définir des boucles d'entraînement. Avec le ParameterServerStrategy
défini ci-dessus (en tant que strategy
), vous utiliserez un tf.distribute.experimental.coordinator.ClusterCoordinator
pour répartir l'exécution des étapes de formation aux travailleurs distants.
Ensuite, vous allez créer un modèle, définir un jeu de données et une fonction d'étape, comme vous l'avez fait dans la boucle d'entraînement avec d'autres tf.distribute.Strategy
s. Vous pouvez trouver plus de détails dans le didacticiel Formation personnalisée avec tf.distribute.Strategy .
Pour garantir une prélecture efficace des ensembles de données, utilisez les API de création d'ensembles de données distribuées recommandées mentionnées dans la section Distribuer les étapes de formation aux travailleurs distants ci-dessous. Assurez-vous également d'appeler Strategy.run
dans worker_fn
pour tirer pleinement parti des GPU alloués aux travailleurs. Le reste des étapes est le même pour l'entraînement avec ou sans GPU.
Créons ces composants dans les étapes suivantes :
Configurer les données
Tout d'abord, écrivez une fonction qui crée un ensemble de données qui inclut la logique de prétraitement implémentée par les couches de prétraitement Keras .
Vous allez créer ces couches en dehors du dataset_fn
mais appliquer la transformation à l'intérieur du dataset_fn
, puisque vous encapsulerez le dataset_fn
dans un tf.function
, qui ne permet pas de créer des variables à l'intérieur.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Générez des exemples de jouets dans un ensemble de données :
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Ensuite, créez le jeu de données d'entraînement enveloppé dans un dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Construire le modèle
Créez ensuite le modèle et les autres objets. Assurez-vous de créer toutes les variables sous strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Confirmons que l'utilisation de FixedShardsPartitioner
divisé toutes les variables en deux partitions et que chaque partition a été affectée à différents serveurs de paramètres :
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Définir l'étape de formation
Troisièmement, créez l'étape de formation enveloppée dans un tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
Dans la fonction d'étape de formation ci-dessus, l'appel Strategy.run
et Strategy.reduce
dans step_fn
peut prendre en charge plusieurs GPU par travailleur. Si les nœuds de calcul ont des GPU alloués, Strategy.run
distribuera les jeux de données sur plusieurs réplicas.
Distribuez les étapes de formation aux télétravailleurs
Une fois tous les calculs définis par ParameterServerStrategy
, vous utiliserez la classe tf.distribute.experimental.coordinator.ClusterCoordinator
pour créer des ressources et distribuer les étapes de formation aux travailleurs distants.
Commençons par créer un objet ClusterCoordinator
et transmettons l'objet stratégie :
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Ensuite, créez un jeu de données par travailleur et un itérateur. Dans le per_worker_dataset_fn
ci-dessous, il est recommandé d'encapsuler le dataset_fn
dans strategy.distribute_datasets_from_function
pour permettre une prélecture efficace vers les GPU de manière transparente.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
La dernière étape consiste à distribuer le calcul aux travailleurs distants à l'aide ClusterCoordinator.schedule
:
- La méthode
schedule
met en file d'attente unetf.function
et renvoie immédiatement uneRemoteValue
future. Les fonctions en file d'attente seront distribuées aux travailleurs distants dans les threads d'arrière-plan et laRemoteValue
sera remplie de manière asynchrone. - La méthode
join
(ClusterCoordinator.join
) peut être utilisée pour attendre que toutes les fonctions planifiées soient exécutées.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Voici comment récupérer le résultat d'une RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
Alternativement, vous pouvez lancer toutes les étapes et faire quelque chose en attendant la fin :
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Pour connaître le flux de travail complet de formation et de diffusion de cet exemple particulier, veuillez consulter ce test .
En savoir plus sur la création d'ensembles de données
L'ensemble de données dans le code ci-dessus est créé à l'aide de l'API ClusterCoordinator.create_per_worker_dataset
). Il crée un ensemble de données par travailleur et renvoie un objet conteneur. Vous pouvez appeler la méthode iter
dessus pour créer un itérateur par travailleur. L'itérateur par travailleur contient un itérateur par travailleur et la tranche correspondante d'un travailleur sera remplacée dans l'argument d'entrée de la fonction transmise à la méthode ClusterCoordinator.schedule
avant que la fonction ne soit exécutée sur un travailleur particulier.
Actuellement, la méthode ClusterCoordinator.schedule
suppose que les nœuds de calcul sont équivalents et suppose donc que les ensembles de données sur différents nœuds de calcul sont identiques, sauf qu'ils peuvent être mélangés différemment s'ils contiennent une opération Dataset.shuffle
. Pour cette raison, il est également recommandé que les ensembles de données soient répétés indéfiniment et que vous planifiez un nombre fini d'étapes au lieu de vous fier à l' OutOfRangeError
d'un ensemble de données.
Une autre remarque importante est que les ensembles de données tf.data
ne prennent pas en charge la sérialisation et la désérialisation implicites dans les limites des tâches. Il est donc important de créer l'ensemble de données complet dans la fonction transmise à ClusterCoordinator.create_per_worker_dataset
.
Évaluation
Il existe plusieurs façons de définir et d'exécuter une boucle d'évaluation dans la formation distribuée. Chacun a ses propres avantages et inconvénients, comme décrit ci-dessous. La méthode d'évaluation en ligne est recommandée si vous n'avez pas de préférence.
Évaluation en ligne
Dans cette méthode, le coordinateur alterne formation et évaluation et c'est ainsi qu'on l'appelle évaluation en ligne .
L'évaluation en ligne présente plusieurs avantages. Par example:
- Il peut prendre en charge de grands modèles d'évaluation et des ensembles de données d'évaluation qu'une seule tâche ne peut pas contenir.
- Les résultats de l'évaluation peuvent être utilisés pour prendre des décisions pour la formation de l'époque suivante.
Il existe deux façons de mettre en œuvre l'évaluation en ligne : l'évaluation directe et l'évaluation distribuée.
- Évaluation directe : pour les petits modèles et les ensembles de données d'évaluation, le coordinateur peut exécuter l'évaluation directement sur le modèle distribué avec l'ensemble de données d'évaluation sur le coordinateur :
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Évaluation distribuée : pour les grands modèles ou ensembles de données qu'il est impossible d'exécuter directement sur le coordinateur, la tâche du coordinateur peut distribuer des tâches d'évaluation aux agents via les méthodes
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Évaluation du side-car
Une autre méthode est appelée évaluation parallèle dans laquelle vous créez une tâche d'évaluateur dédiée qui lit à plusieurs reprises les points de contrôle et exécute l'évaluation sur un dernier point de contrôle. Cela permet à votre programme d'entraînement de se terminer plus tôt si vous n'avez pas besoin de modifier votre boucle d'entraînement en fonction des résultats de l'évaluation. Cependant, il nécessite une tâche d'évaluation supplémentaire et des points de contrôle périodiques pour déclencher l'évaluation. Voici une boucle d'évaluation side-car possible :
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Clusters dans le monde réel
Dans un environnement de production réel, vous exécuterez toutes les tâches dans différents processus sur différentes machines. Le moyen le plus simple de configurer les informations de cluster sur chaque tâche consiste à définir les variables d'environnement "TF_CONFIG"
et à utiliser un tf.distribute.cluster_resolver.TFConfigClusterResolver
pour analyser "TF_CONFIG"
.
Pour une description générale des variables d'environnement "TF_CONFIG"
, reportez-vous au guide de formation distribuée .
Si vous démarrez vos tâches de formation à l'aide de Kubernetes ou d'autres modèles de configuration, il est très probable que ces modèles aient déjà défini “TF_CONFIG"
pour vous.
Définissez la variable d'environnement "TF_CONFIG"
Supposons que vous ayez 3 workers et 2 serveurs de paramètres, le "TF_CONFIG"
du worker 1 peut être :
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
Le "TF_CONFIG"
de l'évaluateur peut être :
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
La partie "cluster"
dans la chaîne "TF_CONFIG"
ci-dessus pour l'évaluateur est facultative.
Si vous utilisez le même binaire pour toutes les tâches
Si vous préférez exécuter toutes ces tâches à l'aide d'un seul binaire, vous devrez laisser votre programme se ramifier en différents rôles dès le début :
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
Le code suivant démarre un serveur TensorFlow et attend :
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Gérer l'échec d'une tâche
Échec du travailleur
tf.distribute.experimental.coordinator.ClusterCoordinator
ou Model.fit
fournissent une tolérance aux pannes intégrée en cas d'échec du worker. Lors de la récupération du nœud de calcul, la fonction d'ensemble de données précédemment fournie (soit pour ClusterCoordinator.create_per_worker_dataset
pour une boucle de formation personnalisée, soit tf.keras.utils.experimental.DatasetCreator
pour Model.fit
) sera appelée sur les nœuds de calcul pour recréer les ensembles de données.
Échec du serveur de paramètres ou du coordinateur
Cependant, lorsque le coordinateur voit une erreur de serveur de paramètres, il déclenche immédiatement une UnavailableError
ou une AbortedError
. Vous pouvez redémarrer le coordinateur dans ce cas. Le coordinateur lui-même peut également devenir indisponible. Par conséquent, certains outillages sont recommandés afin de ne pas perdre la progression de l'entraînement :
Pour
Model.fit
, vous devez utiliser un rappelBackupAndRestore
, qui gère automatiquement la progression de l'enregistrement et de la restauration. Voir la section Rappels et formation ci-dessus pour un exemple.Pour une boucle de formation personnalisée, vous devez contrôler périodiquement les variables de modèle et charger les variables de modèle à partir d'un point de contrôle, le cas échéant, avant le début de la formation. La progression de la formation peut être déduite approximativement à partir d'
optimizer.iterations
si un optimiseur est contrôlé :
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Récupérer une RemoteValue
La récupération d'une RemoteValue
est garantie de réussir si une fonction est exécutée avec succès. En effet, actuellement, la valeur de retour est immédiatement copiée dans le coordinateur après l'exécution d'une fonction. En cas d'échec d'un worker pendant la copie, la fonction sera réessayée sur un autre worker disponible. Par conséquent, si vous souhaitez optimiser les performances, vous pouvez planifier des fonctions sans valeur de retour.
Rapport d'erreur
Une fois que le coordinateur voit une erreur telle UnavailableError
des serveurs de paramètres ou d'autres erreurs d'application telles qu'un InvalidArgument
de tf.debugging.check_numerics
, il annulera toutes les fonctions en attente et en file d'attente avant de générer l'erreur. La récupération de leurs RemoteValue
correspondantes lèvera une CancelledError
.
Une fois qu'une erreur est générée, le coordinateur ne générera pas la même erreur ni aucune erreur provenant de fonctions annulées.
Amélioration des performances
Il existe plusieurs raisons possibles si vous rencontrez des problèmes de performances lorsque vous vous entraînez avec ParameterServerStrategy
et ClusterResolver
.
Une raison courante est que les serveurs de paramètres ont une charge déséquilibrée et que certains serveurs de paramètres fortement chargés ont atteint leur capacité. Il peut également y avoir plusieurs causes profondes. Voici quelques méthodes simples pour atténuer ce problème :
- Fractionnez vos grandes variables de modèle en spécifiant un
variable_partitioner
lors de la construction d'unParameterServerStrategy
. - Si possible, évitez de créer une variable hotspot requise par tous les serveurs de paramètres en une seule étape. Par exemple, utilisez un taux d'apprentissage constant ou une sous-classe
tf.keras.optimizers.schedules.LearningRateSchedule
dans les optimiseurs puisque le comportement par défaut est que le taux d'apprentissage deviendra une variable placée sur un serveur de paramètres particulier et demandée par tous les autres serveurs de paramètres à chaque étape . - Mélangez vos grands vocabulaires avant de les passer aux couches de prétraitement Keras.
Une autre raison possible des problèmes de performance est le coordinateur. Votre première implémentation de schedule
/ join
est basée sur Python et peut donc avoir une surcharge de threading. De plus, la latence entre le coordinateur et les travailleurs peut être importante. Si c'est le cas,
Pour
Model.fit
, vous pouvez définir l'argumentsteps_per_execution
fourni dansModel.compile
sur une valeur supérieure à 1.Pour une boucle d'entraînement personnalisée, vous pouvez regrouper plusieurs étapes dans une seule
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Au fur et à mesure que la bibliothèque est optimisée, nous espérons que la plupart des utilisateurs n'auront pas à emballer manuellement les étapes à l'avenir.
De plus, une petite astuce pour améliorer les performances consiste à planifier des fonctions sans valeur de retour, comme expliqué dans la section de gestion des échecs de tâche ci-dessus.
Limites connues
La plupart des limitations connues sont déjà couvertes dans les sections ci-dessus. Cette section fournit un résumé.
ParameterServerStrategy
général
-
os.environment["grpc_fail_fast"]="use_caller"
est nécessaire sur chaque tâche, y compris le coordinateur, pour que la tolérance aux pannes fonctionne correctement. - La formation de serveur de paramètres synchrone n'est pas prise en charge.
- Il est généralement nécessaire de regrouper plusieurs étapes dans une seule fonction pour obtenir des performances optimales.
- Il n'est pas pris en charge de charger un saved_model via
tf.saved_model.load
contenant des variables fragmentées. Notez que le chargement d'un tel save_model à l'aide de TensorFlow Serving devrait fonctionner. - Il n'est pas pris en charge de charger un point de contrôle contenant des variables d'emplacement d'optimiseur partitionnées dans un nombre différent de partitions.
- Il n'est pas pris en charge de récupérer d'une défaillance du serveur de paramètres sans redémarrer la tâche du coordinateur.
- L'utilisation de
tf.lookup.StaticHashTable
(qui est couramment utilisé par certaines couches de prétraitement Keras, telles quetf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
ettf.keras.layers.TextVectorization
) entraîne des ressources placées sur le coordinateur à ce moment avec une formation de serveur de paramètres. Cela a des implications sur les performances pour les RPC de recherche des nœuds de calcul vers le coordinateur. Il s'agit d'une haute priorité actuelle à traiter.
Model.fit
spécificités
- L'argument
steps_per_epoch
est requis dansModel.fit
. Vous pouvez sélectionner une valeur qui fournit des intervalles appropriés dans une époque. -
ParameterServerStrategy
ne prend pas en charge les rappels personnalisés qui ont des appels au niveau du lot pour des raisons de performances. Vous devez convertir ces appels en appels au niveau de l'époque avecsteps_per_epoch
convenablement choisis, de sorte qu'ils soient appelés à chaque nombre d'steps_per_epoch
. Les rappels intégrés ne sont pas affectés : leurs appels au niveau du lot ont été modifiés pour être performants. La prise en charge des appels au niveau des lots pourParameterServerStrategy
est en cours de planification. - Pour la même raison, contrairement à d'autres stratégies, la barre de progression et les métriques ne sont enregistrées qu'aux limites de l'époque.
-
run_eagerly
n'est pas pris en charge.
Spécificités de la boucle d'entraînement personnalisée
-
ClusterCoordinator.schedule
ne prend pas en charge les garanties de visite pour un ensemble de données. - Lorsque
ClusterCoordinator.create_per_worker_dataset
est utilisé, l'ensemble de données complet doit être créé dans la fonction qui lui est transmise. -
tf.data.Options
est ignoré dans un jeu de données créé parClusterCoordinator.create_per_worker_dataset
.