Formation au serveur de paramètres avec ParameterServerStrategy

Voir sur TensorFlow.org Exécuter dans Google Colab Voir la source sur GitHub Télécharger le cahier

Aperçu

La formation de serveur de paramètres est une méthode commune de données parallèles pour mettre à l'échelle la formation de modèles sur plusieurs machines.

Un cluster de formation de serveur de paramètres se compose de nœuds de calcul et de serveurs de paramètres . Les variables sont créées sur les serveurs de paramètres et elles sont lues et mises à jour par les travailleurs à chaque étape. Par défaut, les agents lisent et mettent à jour ces variables indépendamment sans se synchroniser les uns avec les autres. C'est pourquoi la formation de type serveur de paramètres est parfois appelée formation asynchrone .

Dans TensorFlow 2, la formation des serveurs de paramètres est alimentée par la classe tf.distribute.experimental.ParameterServerStrategy , qui distribue les étapes de formation à un cluster qui évolue jusqu'à des milliers de nœuds de calcul (accompagnés de serveurs de paramètres).

Méthodes de formation prises en charge

Il existe deux principales méthodes de formation prises en charge :

Un cluster avec des jobs et des tâches

Quelle que soit l'API choisie ( Model.fit ou une boucle d'entraînement personnalisée), l'entraînement distribué dans TensorFlow 2 implique : un 'cluster' avec plusieurs 'jobs' , et chacune des tâches peut avoir une ou plusieurs 'tasks' .

Lors de l'utilisation de la formation du serveur de paramètres, il est recommandé d'avoir :

  • Un poste de coordinateur (qui porte le nom de poste chief )
  • Plusieurs tâches de travail (nom de la tâche worker ) ; et
  • Travaux de serveur à paramètres multiples (nom du travail ps )

Pendant que le coordinateur crée des ressources, distribue des tâches de formation, écrit des points de contrôle et traite les échecs de tâches, les travailleurs et les serveurs de paramètres exécutent tf.distribute.Server qui écoute les demandes du coordinateur.

Formation du serveur de paramètres avec l'API Model.fit

La formation du serveur de paramètres avec l'API Model.fit nécessite que le coordinateur utilise un objet tf.distribute.experimental.ParameterServerStrategy et un tf.keras.utils.experimental.DatasetCreator comme entrée. Semblable à l'utilisation de Model.fit sans stratégie, ou avec d'autres stratégies, le flux de travail implique la création et la compilation du modèle, la préparation des rappels, suivis d'un appel Model.fit .

Entraînement du serveur de paramètres avec une boucle d'entraînement personnalisée

Avec les boucles de formation personnalisées, la classe tf.distribute.experimental.coordinator.ClusterCoordinator est le composant clé utilisé pour le coordinateur.

L'API la plus importante fournie par l'objet ClusterCoordinator est schedule :

  • L'API de schedule met en file d'attente une fonction tf. et renvoie immédiatement une tf.function de type RemoteValue .
  • Les fonctions en file d'attente seront distribuées aux travailleurs distants dans les threads d'arrière-plan et leurs RemoteValue seront remplies de manière asynchrone.
  • Étant donné que schedule ne nécessite pas d'affectation de travailleur, la tf.function transmise peut être exécutée sur n'importe quel travailleur disponible.
  • Si le worker sur lequel elle est exécutée devient indisponible avant son achèvement, la fonction sera réessayée sur un autre worker disponible.
  • De ce fait et du fait que l'exécution d'une fonction n'est pas atomique, une fonction peut être exécutée plus d'une fois.

En plus de répartir les fonctions à distance, le ClusterCoordinator aide également à créer des ensembles de données sur tous les travailleurs et à reconstruire ces ensembles de données lorsqu'un travailleur récupère d'un échec.

Configuration du didacticiel

Le didacticiel se ramifiera dans Model.fit et les chemins de boucle de formation personnalisés, et vous pourrez choisir celui qui correspond à vos besoins. Les sections autres que "S'entraîner avec X" s'appliquent aux deux parcours.

pip install portpicker

Configuration du cluster

Comme mentionné ci-dessus, un cluster de formation de serveur de paramètres nécessite une tâche de coordinateur qui exécute votre programme de formation, un ou plusieurs nœuds de calcul et des tâches de serveur de paramètres qui exécutent des serveurs TensorFlow ( tf.distribute.Server et éventuellement une tâche d'évaluation supplémentaire qui exécute une évaluation parallèle. (voir la section d'évaluation du side-car ci-dessous). Les conditions requises pour les mettre en place sont les suivantes :

  • La tâche de coordinateur doit connaître les adresses et les ports de tous les autres serveurs TensorFlow, à l'exception de l'évaluateur.
  • Les nœuds de calcul et les serveurs de paramètres doivent savoir sur quel port ils doivent écouter. Par souci de simplicité, vous pouvez généralement transmettre les informations complètes du cluster lors de la création de serveurs TensorFlow sur ces tâches.
  • La tâche de l'évaluateur n'a pas besoin de connaître la configuration du cluster de formation. Si tel est le cas, il ne doit pas tenter de se connecter au cluster de formation.
  • Les travailleurs et les serveurs de paramètres doivent avoir des types de tâche comme "worker" et "ps" , respectivement. Le coordinateur doit utiliser "chief" comme type de tâche pour des raisons d'héritage.

Dans ce didacticiel, vous allez créer un cluster in-process afin que l'ensemble de la formation du serveur de paramètres puisse être exécuté dans Colab. Vous apprendrez à configurer de vrais clusters dans une section ultérieure.

Cluster en cours

Vous commencerez par créer plusieurs serveurs TensorFlow à l'avance et vous vous y connecterez ultérieurement. Notez que ce n'est que dans le but de la démonstration de ce tutoriel, et dans le cadre d'une formation réelle, les serveurs seront démarrés sur des machines "worker" et "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

La configuration du cluster in-process est fréquemment utilisée dans les tests unitaires, comme ici .

Une autre option pour les tests locaux consiste à lancer des processus sur la machine locale. Consultez Formation multi-travailleurs avec Keras pour un exemple de cette approche.

Instancier une ParameterServerStrategy

Avant de vous plonger dans le code de formation, instancions un objet ParameterServerStrategy . Notez que cela est nécessaire, que vous procédiez avec Model.fit ou une boucle d'entraînement personnalisée. L'argument variable_partitioner sera expliqué dans la section Variable sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Afin d'utiliser des GPU pour la formation, allouez des GPU visibles à chaque travailleur. ParameterServerStrategy utilisera tous les GPU disponibles sur chaque travailleur, avec la restriction que tous les travailleurs doivent avoir le même nombre de GPU disponibles.

Partage variable

Le sharding variable fait référence au fractionnement d'une variable en plusieurs variables plus petites, appelées shards . Le partitionnement variable peut être utile pour répartir la charge du réseau lors de l'accès à ces partitions. Il est également utile de répartir le calcul et le stockage d'une variable normale sur plusieurs serveurs de paramètres.

Pour activer le partitionnement variable, vous pouvez transmettre un variable_partitioner lors de la construction d'un objet ParameterServerStrategy . Le variable_partitioner sera appelé chaque fois qu'une variable est créée et il est prévu de renvoyer le nombre de fragments le long de chaque dimension de la variable. Certains variable_partitioner prêts à l'emploi sont fournis, tels que tf.distribute.experimental.partitioners.MinSizePartitioner . Il est recommandé d'utiliser des partitionneurs basés sur la taille comme tf.distribute.experimental.partitioners.MinSizePartitioner pour éviter de partitionner de petites variables, ce qui pourrait avoir un impact négatif sur la vitesse de formation du modèle.

Lorsqu'un variable_partitioner est passé et si vous créez une variable directement sous strategy.scope() , il deviendra un type de conteneur avec une propriété variables qui donne accès à la liste des fragments. Dans la plupart des cas, ce conteneur sera automatiquement converti en Tensor en concaténant tous les fragments. Par conséquent, il peut être utilisé comme une variable normale. D'autre part, certaines méthodes TensorFlow telles que tf.nn.embedding_lookup fournissent une implémentation efficace pour ce type de conteneur et dans ces méthodes, la concaténation automatique sera évitée.

Veuillez consulter la documentation de l'API de tf.distribute.experimental.ParameterServerStrategy pour plus de détails.

Entraînement avec Model.fit

Keras fournit une API de formation facile à utiliser via Model.fit qui gère la boucle de formation sous le capot, avec la flexibilité de train_step et des rappels, qui fournissent des fonctionnalités telles que l'enregistrement de points de contrôle ou l'enregistrement de résumé pour TensorBoard. Avec Model.fit , le même code d'entraînement peut être utilisé pour d'autres stratégies avec un simple échange de l'objet de stratégie.

Des données d'entrée

Model.fit avec la formation du serveur de paramètres nécessite que les données d'entrée soient fournies dans un appelable qui prend un seul argument de type tf.distribute.InputContext et renvoie un tf.data.Dataset . Ensuite, créez un objet tf.keras.utils.experimental.DatasetCreator qui prend un tel callable et un objet tf.distribute.InputOptions facultatif via l'argument input_options .

Notez qu'il est recommandé de mélanger et de répéter les données avec la formation du serveur de paramètres et de spécifier steps_per_epoch dans l'appel fit afin que la bibliothèque connaisse les limites de l'époque.

Veuillez consulter le didacticiel sur l' entrée distribuée pour plus d'informations sur l'argument InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Le code dans dataset_fn sera appelé sur le périphérique d'entrée, qui est généralement le processeur, sur chacune des machines de travail.

Construction et compilation de modèles

Maintenant, vous allez créer un tf.keras.Model - un modèle trivial tf.keras.models.Sequential à des fins de démonstration - suivi d'un appel Model.compile pour incorporer des composants, tels qu'un optimiseur, des métriques ou des paramètres tels que steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Rappels et formation

Avant d'appeler model.fit pour la formation proprement dite, préparons les rappels nécessaires pour les tâches courantes, telles que :

  • ModelCheckpoint : pour enregistrer les poids du modèle.
  • BackupAndRestore : pour s'assurer que la progression de la formation est automatiquement sauvegardée et récupérée en cas d'indisponibilité du cluster (telle qu'un abandon ou une préemption) ; ou
  • TensorBoard : pour enregistrer les rapports d'avancement dans des fichiers de synthèse, qui sont visualisés dans l'outil TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Utilisation directe avec ClusterCoordinator (facultatif)

Même si vous choisissez le parcours de formation Model.fit , vous pouvez éventuellement instancier un objet tf.distribute.experimental.coordinator.ClusterCoordinator pour planifier d'autres fonctions que vous souhaitez exécuter sur les nœuds de calcul. Voir la section Entraînement avec une boucle d'entraînement personnalisée pour plus de détails et d'exemples.

Entraînement avec une boucle d'entraînement personnalisée

L'utilisation de boucles d'entraînement personnalisées avec tf.distribute.Strategy offre une grande flexibilité pour définir des boucles d'entraînement. Avec le ParameterServerStrategy défini ci-dessus (en tant que strategy ), vous utiliserez un tf.distribute.experimental.coordinator.ClusterCoordinator pour répartir l'exécution des étapes de formation aux travailleurs distants.

Ensuite, vous allez créer un modèle, définir un jeu de données et une fonction d'étape, comme vous l'avez fait dans la boucle d'entraînement avec d'autres tf.distribute.Strategy s. Vous pouvez trouver plus de détails dans le didacticiel Formation personnalisée avec tf.distribute.Strategy .

Pour garantir une prélecture efficace des ensembles de données, utilisez les API de création d'ensembles de données distribuées recommandées mentionnées dans la section Distribuer les étapes de formation aux travailleurs distants ci-dessous. Assurez-vous également d'appeler Strategy.run dans worker_fn pour tirer pleinement parti des GPU alloués aux travailleurs. Le reste des étapes est le même pour l'entraînement avec ou sans GPU.

Créons ces composants dans les étapes suivantes :

Configurer les données

Tout d'abord, écrivez une fonction qui crée un ensemble de données qui inclut la logique de prétraitement implémentée par les couches de prétraitement Keras .

Vous allez créer ces couches en dehors du dataset_fn mais appliquer la transformation à l'intérieur du dataset_fn , puisque vous encapsulerez le dataset_fn dans un tf.function , qui ne permet pas de créer des variables à l'intérieur.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Générez des exemples de jouets dans un ensemble de données :

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Ensuite, créez le jeu de données d'entraînement enveloppé dans un dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Construire le modèle

Créez ensuite le modèle et les autres objets. Assurez-vous de créer toutes les variables sous strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Confirmons que l'utilisation de FixedShardsPartitioner divisé toutes les variables en deux partitions et que chaque partition a été affectée à différents serveurs de paramètres :

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Définir l'étape de formation

Troisièmement, créez l'étape de formation enveloppée dans un tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Dans la fonction d'étape de formation ci-dessus, l'appel Strategy.run et Strategy.reduce dans step_fn peut prendre en charge plusieurs GPU par travailleur. Si les nœuds de calcul ont des GPU alloués, Strategy.run distribuera les jeux de données sur plusieurs réplicas.

Distribuez les étapes de formation aux télétravailleurs

Une fois tous les calculs définis par ParameterServerStrategy , vous utiliserez la classe tf.distribute.experimental.coordinator.ClusterCoordinator pour créer des ressources et distribuer les étapes de formation aux travailleurs distants.

Commençons par créer un objet ClusterCoordinator et transmettons l'objet stratégie :

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Ensuite, créez un jeu de données par travailleur et un itérateur. Dans le per_worker_dataset_fn ci-dessous, il est recommandé d'encapsuler le dataset_fn dans strategy.distribute_datasets_from_function pour permettre une prélecture efficace vers les GPU de manière transparente.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

La dernière étape consiste à distribuer le calcul aux travailleurs distants à l'aide ClusterCoordinator.schedule :

  • La méthode schedule met en file d'attente une tf.function et renvoie immédiatement une RemoteValue future. Les fonctions en file d'attente seront distribuées aux travailleurs distants dans les threads d'arrière-plan et la RemoteValue sera remplie de manière asynchrone.
  • La méthode join ( ClusterCoordinator.join ) peut être utilisée pour attendre que toutes les fonctions planifiées soient exécutées.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Voici comment récupérer le résultat d'une RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternativement, vous pouvez lancer toutes les étapes et faire quelque chose en attendant la fin :

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Pour connaître le flux de travail complet de formation et de diffusion de cet exemple particulier, veuillez consulter ce test .

En savoir plus sur la création d'ensembles de données

L'ensemble de données dans le code ci-dessus est créé à l'aide de l'API ClusterCoordinator.create_per_worker_dataset ). Il crée un ensemble de données par travailleur et renvoie un objet conteneur. Vous pouvez appeler la méthode iter dessus pour créer un itérateur par travailleur. L'itérateur par travailleur contient un itérateur par travailleur et la tranche correspondante d'un travailleur sera remplacée dans l'argument d'entrée de la fonction transmise à la méthode ClusterCoordinator.schedule avant que la fonction ne soit exécutée sur un travailleur particulier.

Actuellement, la méthode ClusterCoordinator.schedule suppose que les nœuds de calcul sont équivalents et suppose donc que les ensembles de données sur différents nœuds de calcul sont identiques, sauf qu'ils peuvent être mélangés différemment s'ils contiennent une opération Dataset.shuffle . Pour cette raison, il est également recommandé que les ensembles de données soient répétés indéfiniment et que vous planifiez un nombre fini d'étapes au lieu de vous fier à l' OutOfRangeError d'un ensemble de données.

Une autre remarque importante est que les ensembles de données tf.data ne prennent pas en charge la sérialisation et la désérialisation implicites dans les limites des tâches. Il est donc important de créer l'ensemble de données complet dans la fonction transmise à ClusterCoordinator.create_per_worker_dataset .

Évaluation

Il existe plusieurs façons de définir et d'exécuter une boucle d'évaluation dans la formation distribuée. Chacun a ses propres avantages et inconvénients, comme décrit ci-dessous. La méthode d'évaluation en ligne est recommandée si vous n'avez pas de préférence.

Évaluation en ligne

Dans cette méthode, le coordinateur alterne formation et évaluation et c'est ainsi qu'on l'appelle évaluation en ligne .

L'évaluation en ligne présente plusieurs avantages. Par example:

  • Il peut prendre en charge de grands modèles d'évaluation et des ensembles de données d'évaluation qu'une seule tâche ne peut pas contenir.
  • Les résultats de l'évaluation peuvent être utilisés pour prendre des décisions pour la formation de l'époque suivante.

Il existe deux façons de mettre en œuvre l'évaluation en ligne : l'évaluation directe et l'évaluation distribuée.

  • Évaluation directe : pour les petits modèles et les ensembles de données d'évaluation, le coordinateur peut exécuter l'évaluation directement sur le modèle distribué avec l'ensemble de données d'évaluation sur le coordinateur :
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Évaluation distribuée : pour les grands modèles ou ensembles de données qu'il est impossible d'exécuter directement sur le coordinateur, la tâche du coordinateur peut distribuer des tâches d'évaluation aux agents via les méthodes ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Évaluation du side-car

Une autre méthode est appelée évaluation parallèle dans laquelle vous créez une tâche d'évaluateur dédiée qui lit à plusieurs reprises les points de contrôle et exécute l'évaluation sur un dernier point de contrôle. Cela permet à votre programme d'entraînement de se terminer plus tôt si vous n'avez pas besoin de modifier votre boucle d'entraînement en fonction des résultats de l'évaluation. Cependant, il nécessite une tâche d'évaluation supplémentaire et des points de contrôle périodiques pour déclencher l'évaluation. Voici une boucle d'évaluation side-car possible :

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Clusters dans le monde réel

Dans un environnement de production réel, vous exécuterez toutes les tâches dans différents processus sur différentes machines. Le moyen le plus simple de configurer les informations de cluster sur chaque tâche consiste à définir les variables d'environnement "TF_CONFIG" et à utiliser un tf.distribute.cluster_resolver.TFConfigClusterResolver pour analyser "TF_CONFIG" .

Pour une description générale des variables d'environnement "TF_CONFIG" , reportez-vous au guide de formation distribuée .

Si vous démarrez vos tâches de formation à l'aide de Kubernetes ou d'autres modèles de configuration, il est très probable que ces modèles aient déjà défini “TF_CONFIG" pour vous.

Définissez la variable d'environnement "TF_CONFIG"

Supposons que vous ayez 3 workers et 2 serveurs de paramètres, le "TF_CONFIG" du worker 1 peut être :

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

Le "TF_CONFIG" de l'évaluateur peut être :

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

La partie "cluster" dans la chaîne "TF_CONFIG" ci-dessus pour l'évaluateur est facultative.

Si vous utilisez le même binaire pour toutes les tâches

Si vous préférez exécuter toutes ces tâches à l'aide d'un seul binaire, vous devrez laisser votre programme se ramifier en différents rôles dès le début :

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Le code suivant démarre un serveur TensorFlow et attend :

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Gérer l'échec d'une tâche

Échec du travailleur

tf.distribute.experimental.coordinator.ClusterCoordinator ou Model.fit fournissent une tolérance aux pannes intégrée en cas d'échec du worker. Lors de la récupération du nœud de calcul, la fonction d'ensemble de données précédemment fournie (soit pour ClusterCoordinator.create_per_worker_dataset pour une boucle de formation personnalisée, soit tf.keras.utils.experimental.DatasetCreator pour Model.fit ) sera appelée sur les nœuds de calcul pour recréer les ensembles de données.

Échec du serveur de paramètres ou du coordinateur

Cependant, lorsque le coordinateur voit une erreur de serveur de paramètres, il déclenche immédiatement une UnavailableError ou une AbortedError . Vous pouvez redémarrer le coordinateur dans ce cas. Le coordinateur lui-même peut également devenir indisponible. Par conséquent, certains outillages sont recommandés afin de ne pas perdre la progression de l'entraînement :

  • Pour Model.fit , vous devez utiliser un rappel BackupAndRestore , qui gère automatiquement la progression de l'enregistrement et de la restauration. Voir la section Rappels et formation ci-dessus pour un exemple.

  • Pour une boucle de formation personnalisée, vous devez contrôler périodiquement les variables de modèle et charger les variables de modèle à partir d'un point de contrôle, le cas échéant, avant le début de la formation. La progression de la formation peut être déduite approximativement à partir d' optimizer.iterations si un optimiseur est contrôlé :

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Récupérer une RemoteValue

La récupération d'une RemoteValue est garantie de réussir si une fonction est exécutée avec succès. En effet, actuellement, la valeur de retour est immédiatement copiée dans le coordinateur après l'exécution d'une fonction. En cas d'échec d'un worker pendant la copie, la fonction sera réessayée sur un autre worker disponible. Par conséquent, si vous souhaitez optimiser les performances, vous pouvez planifier des fonctions sans valeur de retour.

Rapport d'erreur

Une fois que le coordinateur voit une erreur telle UnavailableError des serveurs de paramètres ou d'autres erreurs d'application telles qu'un InvalidArgument de tf.debugging.check_numerics , il annulera toutes les fonctions en attente et en file d'attente avant de générer l'erreur. La récupération de leurs RemoteValue correspondantes lèvera une CancelledError .

Une fois qu'une erreur est générée, le coordinateur ne générera pas la même erreur ni aucune erreur provenant de fonctions annulées.

Amélioration des performances

Il existe plusieurs raisons possibles si vous rencontrez des problèmes de performances lorsque vous vous entraînez avec ParameterServerStrategy et ClusterResolver .

Une raison courante est que les serveurs de paramètres ont une charge déséquilibrée et que certains serveurs de paramètres fortement chargés ont atteint leur capacité. Il peut également y avoir plusieurs causes profondes. Voici quelques méthodes simples pour atténuer ce problème :

  1. Fractionnez vos grandes variables de modèle en spécifiant un variable_partitioner lors de la construction d'un ParameterServerStrategy .
  2. Si possible, évitez de créer une variable hotspot requise par tous les serveurs de paramètres en une seule étape. Par exemple, utilisez un taux d'apprentissage constant ou une sous-classe tf.keras.optimizers.schedules.LearningRateSchedule dans les optimiseurs puisque le comportement par défaut est que le taux d'apprentissage deviendra une variable placée sur un serveur de paramètres particulier et demandée par tous les autres serveurs de paramètres à chaque étape .
  3. Mélangez vos grands vocabulaires avant de les passer aux couches de prétraitement Keras.

Une autre raison possible des problèmes de performance est le coordinateur. Votre première implémentation de schedule / join est basée sur Python et peut donc avoir une surcharge de threading. De plus, la latence entre le coordinateur et les travailleurs peut être importante. Si c'est le cas,

  • Pour Model.fit , vous pouvez définir l'argument steps_per_execution fourni dans Model.compile sur une valeur supérieure à 1.

  • Pour une boucle d'entraînement personnalisée, vous pouvez regrouper plusieurs étapes dans une seule tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Au fur et à mesure que la bibliothèque est optimisée, nous espérons que la plupart des utilisateurs n'auront pas à emballer manuellement les étapes à l'avenir.

De plus, une petite astuce pour améliorer les performances consiste à planifier des fonctions sans valeur de retour, comme expliqué dans la section de gestion des échecs de tâche ci-dessus.

Limites connues

La plupart des limitations connues sont déjà couvertes dans les sections ci-dessus. Cette section fournit un résumé.

ParameterServerStrategy général

  • os.environment["grpc_fail_fast"]="use_caller" est nécessaire sur chaque tâche, y compris le coordinateur, pour que la tolérance aux pannes fonctionne correctement.
  • La formation de serveur de paramètres synchrone n'est pas prise en charge.
  • Il est généralement nécessaire de regrouper plusieurs étapes dans une seule fonction pour obtenir des performances optimales.
  • Il n'est pas pris en charge de charger un saved_model via tf.saved_model.load contenant des variables fragmentées. Notez que le chargement d'un tel save_model à l'aide de TensorFlow Serving devrait fonctionner.
  • Il n'est pas pris en charge de charger un point de contrôle contenant des variables d'emplacement d'optimiseur partitionnées dans un nombre différent de partitions.
  • Il n'est pas pris en charge de récupérer d'une défaillance du serveur de paramètres sans redémarrer la tâche du coordinateur.
  • L'utilisation de tf.lookup.StaticHashTable (qui est couramment utilisé par certaines couches de prétraitement Keras, telles que tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup et tf.keras.layers.TextVectorization ) entraîne des ressources placées sur le coordinateur à ce moment avec une formation de serveur de paramètres. Cela a des implications sur les performances pour les RPC de recherche des nœuds de calcul vers le coordinateur. Il s'agit d'une haute priorité actuelle à traiter.

Model.fit spécificités

  • L'argument steps_per_epoch est requis dans Model.fit . Vous pouvez sélectionner une valeur qui fournit des intervalles appropriés dans une époque.
  • ParameterServerStrategy ne prend pas en charge les rappels personnalisés qui ont des appels au niveau du lot pour des raisons de performances. Vous devez convertir ces appels en appels au niveau de l'époque avec steps_per_epoch convenablement choisis, de sorte qu'ils soient appelés à chaque nombre d' steps_per_epoch . Les rappels intégrés ne sont pas affectés : leurs appels au niveau du lot ont été modifiés pour être performants. La prise en charge des appels au niveau des lots pour ParameterServerStrategy est en cours de planification.
  • Pour la même raison, contrairement à d'autres stratégies, la barre de progression et les métriques ne sont enregistrées qu'aux limites de l'époque.
  • run_eagerly n'est pas pris en charge.

Spécificités de la boucle d'entraînement personnalisée