Training del server dei parametri con ParameterServerStrategy

Visualizza su TensorFlow.org Esegui in Google Colab Visualizza l'origine su GitHub Scarica quaderno

Panoramica

L' addestramento del server dei parametri è un metodo parallelo ai dati comune per aumentare l'addestramento del modello su più macchine.

Un cluster di formazione del server dei parametri è costituito da lavoratori e server dei parametri . Le variabili vengono create sui server dei parametri e vengono lette e aggiornate dagli operatori in ogni passaggio. Per impostazione predefinita, i lavoratori leggono e aggiornano queste variabili in modo indipendente senza sincronizzarsi tra loro. Questo è il motivo per cui a volte il training in stile server dei parametri viene chiamato training asincrono .

In TensorFlow 2, l'addestramento del server dei parametri è basato sulla classe tf.distribute.experimental.ParameterServerStrategy , che distribuisce i passaggi dell'addestramento a un cluster scalabile fino a migliaia di lavoratori (accompagnato da server dei parametri).

Metodi di formazione supportati

Esistono due principali metodi di formazione supportati:

Un cluster con lavori e attività

Indipendentemente dall'API scelta ( Model.fit o un ciclo di formazione personalizzato), la formazione distribuita in TensorFlow 2 prevede: un 'cluster' con diversi 'jobs' e ciascuno dei lavori può avere uno o più 'tasks' .

Quando si utilizza l'addestramento del server dei parametri, si consiglia di disporre di:

  • Un lavoro di coordinatore (che ha il nome del lavoro chief )
  • Lavori con più lavoratori (nome del lavoro worker ); e
  • Più lavori del server di parametri (nome lavoro ps )

Mentre il coordinatore crea risorse, invia attività di formazione, scrive checkpoint e si occupa degli errori delle attività, i lavoratori e i server dei parametri eseguono tf.distribute.Server che ascolta le richieste del coordinatore.

Addestramento del server dei parametri con l'API Model.fit

L'addestramento del server dei parametri con l'API Model.fit richiede che il coordinatore utilizzi un oggetto tf.distribute.experimental.ParameterServerStrategy e un tf.keras.utils.experimental.DatasetCreator come input. Simile all'utilizzo di Model.fit senza strategia o con altre strategie, il flusso di lavoro prevede la creazione e la compilazione del modello, la preparazione dei callback, seguiti da una chiamata Model.fit .

Training del server dei parametri con un ciclo di training personalizzato

Con cicli di formazione personalizzati, la classe tf.distribute.experimental.coordinator.ClusterCoordinator è il componente chiave utilizzato per il coordinatore.

L'API più importante fornita dall'oggetto ClusterCoordinator è schedule :

  • L'API di schedule accoda una tf.function e restituisce immediatamente un RemoteValue simile al futuro.
  • Le funzioni in coda verranno inviate ai lavoratori remoti nei thread in background e i RemoteValue verranno riempiti in modo asincrono.
  • Poiché la schedule non richiede l'assegnazione di un lavoratore, la tf.function passata può essere eseguita su qualsiasi lavoratore disponibile.
  • Se il lavoratore su cui viene eseguito diventa non disponibile prima del suo completamento, la funzione verrà ritentata su un altro lavoratore disponibile.
  • A causa di questo fatto e del fatto che l'esecuzione della funzione non è atomica, una funzione può essere eseguita più di una volta.

Oltre a inviare funzioni remote, ClusterCoordinator aiuta anche a creare set di dati su tutti i lavoratori e ricostruire questi set di dati quando un lavoratore si riprende da un guasto.

Configurazione dell'esercitazione

Il tutorial si dirama in Model.fit e percorsi di ciclo di formazione personalizzati e puoi scegliere quello che si adatta alle tue esigenze. Sezioni diverse da "Formazione con X" sono applicabili a entrambi i percorsi.

pip install portpicker

Configurazione del cluster

Come accennato in precedenza, un cluster di formazione del server dei parametri richiede un'attività del coordinatore che esegua il programma di formazione, uno o più lavoratori e attività del server dei parametri che eseguono i server TensorFlow — tf.distribute.Server — ed eventualmente un'attività di valutazione aggiuntiva che esegua la valutazione side-car (vedi la sezione di valutazione del sidecar di seguito). I requisiti per configurarli sono:

  • L'attività del coordinatore deve conoscere gli indirizzi e le porte di tutti gli altri server TensorFlow ad eccezione del valutatore.
  • Gli operatori e i server dei parametri devono sapere quale porta devono ascoltare. Per motivi di semplicità, di solito è possibile trasferire le informazioni complete sul cluster durante la creazione di server TensorFlow su queste attività.
  • L'attività di valutazione non deve conoscere l'impostazione del cluster di formazione. In tal caso, non dovrebbe tentare di connettersi al cluster di formazione.
  • I ruoli di lavoro e i server dei parametri devono avere tipi di attività rispettivamente come "worker" e "ps" . Il coordinatore dovrebbe utilizzare "chief" come tipo di attività per motivi legacy.

In questo tutorial creerai un cluster in-process in modo che l'intero training del server dei parametri possa essere eseguito in Colab. Imparerai come impostare cluster reali in una sezione successiva.

Cluster in corso

Inizierai creando diversi server TensorFlow in anticipo e ti connetterai ad essi in un secondo momento. Nota che questo è solo per lo scopo della dimostrazione di questo tutorial e nella formazione reale i server verranno avviati su macchine "worker" e "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

La configurazione del cluster in-process viene spesso utilizzata negli unit test, come qui .

Un'altra opzione per i test locali consiste nell'avviare i processi sul computer locale: per un esempio di questo approccio, consulta Formazione multi-lavoratore con Keras .

Istanziare una ParameterServerStrategy

Prima di approfondire il codice di addestramento, istanziare un oggetto ParameterServerStrategy . Tieni presente che ciò è necessario indipendentemente dal fatto che tu stia procedendo con Model.fit o con un ciclo di formazione personalizzato. L'argomento variable_partitioner verrà spiegato nella sezione Partizionamento variabile delle variabili.

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Per utilizzare le GPU per la formazione, alloca le GPU visibili a ciascun lavoratore. ParameterServerStrategy utilizzerà tutte le GPU disponibili su ogni lavoratore, con la limitazione che tutti i lavoratori dovrebbero avere lo stesso numero di GPU disponibili.

Frazionamento variabile

Il partizionamento orizzontale delle variabili si riferisce alla divisione di una variabile in più variabili più piccole, chiamate shard . Il partizionamento orizzontale variabile può essere utile per distribuire il carico di rete durante l'accesso a questi frammenti. È anche utile distribuire il calcolo e l'archiviazione di una variabile normale su più server di parametri.

Per abilitare il partizionamento orizzontale delle variabili, puoi passare un variable_partitioner durante la costruzione di un oggetto ParameterServerStrategy . Il variable_partitioner verrà invocato ogni volta che viene creata una variabile e dovrebbe restituire il numero di shard lungo ciascuna dimensione della variabile. Vengono forniti alcuni variable_partitioner predefiniti come tf.distribute.experimental.partitioners.MinSizePartitioner . Si consiglia di utilizzare partizionatori basati sulle dimensioni come tf.distribute.experimental.partitioners.MinSizePartitioner per evitare il partizionamento di piccole variabili, che potrebbero avere un impatto negativo sulla velocità di training del modello.

Quando viene passato un variable_partitioner e se crei una variabile direttamente in strategy.scope() , diventerà un tipo di contenitore con una proprietà delle variables che fornisce l'accesso all'elenco degli shard. Nella maggior parte dei casi, questo contenitore verrà automaticamente convertito in un Tensor concatenando tutti gli shard. Di conseguenza, può essere utilizzata come una variabile normale. D'altra parte, alcuni metodi TensorFlow come tf.nn.embedding_lookup forniscono un'implementazione efficiente per questo tipo di contenitore e in questi metodi verrà evitata la concatenazione automatica.

Consulta i documenti API di tf.distribute.experimental.ParameterServerStrategy per maggiori dettagli.

Allenarsi con Model.fit

Keras fornisce un'API di addestramento di facile utilizzo tramite Model.fit che gestisce il ciclo di addestramento sotto il cofano, con la flessibilità di train_step e callback, che forniscono funzionalità come il salvataggio del checkpoint o il salvataggio di riepilogo per TensorBoard. Con Model.fit , lo stesso codice di addestramento può essere utilizzato per altre strategie con un semplice scambio dell'oggetto strategia.

Dati in ingresso

Model.fit con il training del server dei parametri richiede che i dati di input siano forniti in un callable che accetta un singolo argomento di tipo tf.distribute.InputContext e restituisce un tf.data.Dataset . Quindi, crea un oggetto tf.keras.utils.experimental.DatasetCreator che accetta tale callable e un oggetto tf.distribute.InputOptions opzionale tramite l'argomento input_options .

Si noti che si consiglia di mescolare e ripetere i dati con l'addestramento del server dei parametri e specificare steps_per_epoch nella chiamata fit in modo che la libreria conosca i limiti di epoch.

Consulta l'esercitazione sull'input distribuito per ulteriori informazioni sull'argomento InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Il codice in dataset_fn verrà richiamato sul dispositivo di input, che di solito è la CPU, su ciascuna delle macchine worker.

Costruzione e compilazione del modello

Ora creerai un tf.keras.Model , un banale modello tf.keras.models.Sequential a scopo dimostrativo, seguito da una chiamata Model.compile per incorporare componenti, come un ottimizzatore, metriche o parametri come steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Richiami e formazione

Prima di chiamare model.fit per la formazione vera e propria, prepariamo i callback necessari per attività comuni, come ad esempio:

  • ModelCheckpoint : per salvare i pesi del modello.
  • BackupAndRestore : per assicurarsi che l'avanzamento della formazione venga automaticamente eseguito il backup e ripristinato se il cluster si verifica indisponibilità (come interruzione o prelazione); o
  • TensorBoard : per salvare i rapporti di avanzamento in file di riepilogo, che vengono visualizzati nello strumento TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Utilizzo diretto con ClusterCoordinator (opzionale)

Anche se scegli il percorso di formazione Model.fit , puoi opzionalmente istanziare un oggetto tf.distribute.experimental.coordinator.ClusterCoordinator per programmare altre funzioni che vorresti fossero eseguite sui lavoratori. Per ulteriori dettagli ed esempi, vedere la sezione Formazione con un ciclo di formazione personalizzato .

Allenamento con un ciclo di allenamento personalizzato

L'utilizzo di cicli di formazione personalizzati con tf.distribute.Strategy offre una grande flessibilità per definire i cicli di formazione. Con la ParameterServerStrategy definita sopra (come strategy ), utilizzerai un tf.distribute.experimental.coordinator.ClusterCoordinator per inviare l'esecuzione dei passaggi di formazione ai lavoratori remoti.

Quindi, creerai un modello, definirai un set di dati e una funzione di passaggio, come hai fatto nel ciclo di formazione con altri tf.distribute.Strategy s. Puoi trovare maggiori dettagli nel tutorial Formazione personalizzata con tf.distribute.Strategy .

Per garantire un precaricamento efficiente dei set di dati, utilizzare le API di creazione di set di dati distribuite consigliate menzionate nella sezione Invio dei passaggi di formazione ai lavoratori remoti di seguito. Inoltre, assicurati di chiamare Strategy.run all'interno di worker_fn per sfruttare appieno le GPU assegnate ai lavoratori. Il resto dei passaggi è lo stesso per l'allenamento con o senza GPU.

Creiamo questi componenti nei seguenti passaggi:

Imposta i dati

Innanzitutto, scrivi una funzione che crei un set di dati che includa la logica di preelaborazione implementata dai livelli di preelaborazione di Keras .

Creerai questi livelli al di fuori del dataset_fn ma applicherai la trasformazione all'interno del dataset_fn , poiché avvolgerai il dataset_fn in un tf.function , che non consente la creazione di variabili al suo interno.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Genera esempi di giocattoli in un set di dati:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Quindi, crea il set di dati di addestramento racchiuso in un dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Costruisci il modello

Quindi, crea il modello e altri oggetti. Assicurati di creare tutte le variabili in strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Confermiamo che l'uso di FixedShardsPartitioner diviso tutte le variabili in due shard e ogni shard è stato assegnato a diversi server di parametri:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Definisci la fase di formazione

Terzo, crea la fase di formazione racchiusa in una tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Nella funzione del passaggio di addestramento sopra, la chiamata di Strategy.run e Strategy.reduce in step_fn può supportare più GPU per lavoratore. Se ai lavoratori sono assegnate GPU, Strategy.run distribuirà i set di dati su più repliche.

Inviare le fasi di formazione ai lavoratori remoti

Dopo che tutti i calcoli sono stati definiti da ParameterServerStrategy , utilizzerai la classe tf.distribute.experimental.coordinator.ClusterCoordinator per creare risorse e distribuire i passaggi di formazione ai lavoratori remoti.

Creiamo prima un oggetto ClusterCoordinator e passiamo l'oggetto strategia:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Quindi, crea un set di dati per lavoratore e un iteratore. In per_worker_dataset_fn seguito, si consiglia di eseguire il wrapping di dataset_fn in strategy.distribute_datasets_from_function per consentire un precaricamento efficiente alle GPU senza interruzioni.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Il passaggio finale consiste nel distribuire il calcolo ai lavoratori remoti utilizzando ClusterCoordinator.schedule :

  • Il metodo di schedule accoda una tf.function e restituisce immediatamente un RemoteValue simile al futuro. Le funzioni in coda verranno inviate ai lavoratori remoti nei thread in background e RemoteValue verrà compilato in modo asincrono.
  • Il metodo join ( ClusterCoordinator.join ) può essere utilizzato per attendere l'esecuzione di tutte le funzioni pianificate.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Ecco come recuperare il risultato di un RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

In alternativa, puoi avviare tutti i passaggi e fare qualcosa in attesa del completamento:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Per il flusso di lavoro completo di formazione e servizio per questo particolare esempio, dai un'occhiata a questo test .

Ulteriori informazioni sulla creazione di set di dati

Il set di dati nel codice precedente viene creato utilizzando l'API ClusterCoordinator.create_per_worker_dataset ). Crea un set di dati per lavoratore e restituisce un oggetto contenitore. Puoi chiamare il metodo iter su di esso per creare un iteratore per lavoratore. L'iteratore per lavoratore contiene un iteratore per lavoratore e la sezione corrispondente di un lavoratore verrà sostituita nell'argomento di input della funzione passata al metodo ClusterCoordinator.schedule prima che la funzione venga eseguita su un particolare lavoratore.

Attualmente, il metodo ClusterCoordinator.schedule presuppone che i lavoratori siano equivalenti e quindi presuppone che i set di dati su lavoratori diversi siano gli stessi, tranne per il fatto che potrebbero essere mescolati in modo diverso se contengono un'operazione Dataset.shuffle . Per questo motivo, si consiglia inoltre di ripetere i set di dati all'infinito e di pianificare un numero limitato di passaggi invece di fare affidamento su OutOfRangeError da un set di dati.

Un'altra nota importante è che i set di dati tf.data non supportano la serializzazione e la deserializzazione implicite attraverso i limiti delle attività. Quindi è importante creare l'intero set di dati all'interno della funzione passata a ClusterCoordinator.create_per_worker_dataset .

Valutazione

Esiste più di un modo per definire ed eseguire un ciclo di valutazione nella formazione distribuita. Ognuno ha i suoi pro e contro, come descritto di seguito. Il metodo di valutazione in linea è consigliato se non hai una preferenza.

Valutazione in linea

In questo metodo, il coordinatore alterna formazione e valutazione e quindi viene chiamato valutazione in linea .

Ci sono diversi vantaggi della valutazione in linea. Per esempio:

  • Può supportare grandi modelli di valutazione e set di dati di valutazione che una singola attività non può contenere.
  • I risultati della valutazione possono essere utilizzati per prendere decisioni per l'allenamento dell'epoca successiva.

Esistono due modi per implementare la valutazione in linea: valutazione diretta e valutazione distribuita.

  • Valutazione diretta : per piccoli modelli e set di dati di valutazione, il coordinatore può eseguire la valutazione direttamente sul modello distribuito con il set di dati di valutazione sul coordinatore:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Valutazione distribuita : per modelli o set di dati di grandi dimensioni che non è possibile eseguire direttamente sul coordinatore, l'attività del coordinatore può distribuire compiti di valutazione ai lavoratori tramite i metodi ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Valutazione sidecar

Un altro metodo è chiamato valutazione side-car in cui si crea un'attività di valutazione dedicata che legge ripetutamente i checkpoint ed esegue la valutazione su un checkpoint più recente. Consente al programma di allenamento di terminare in anticipo se non è necessario modificare il ciclo di allenamento in base ai risultati della valutazione. Tuttavia, richiede un'attività di valutazione aggiuntiva e un checkpoint periodico per attivare la valutazione. Di seguito è riportato un possibile ciclo di valutazione del sidecar:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Cluster nel mondo reale

In un ambiente di produzione reale, eseguirai tutte le attività in processi diversi su macchine diverse. Il modo più semplice per configurare le informazioni del cluster su ciascuna attività consiste nell'impostare le variabili di ambiente "TF_CONFIG" e utilizzare un tf.distribute.cluster_resolver.TFConfigClusterResolver per analizzare "TF_CONFIG" .

Per una descrizione generale delle variabili di ambiente "TF_CONFIG" , fare riferimento alla Guida alla formazione distribuita .

Se inizi le tue attività di formazione utilizzando Kubernetes o altri modelli di configurazione, è molto probabile che questi modelli abbiano già impostato “TF_CONFIG" per te.

Impostare la variabile di ambiente "TF_CONFIG" .

Supponiamo di avere 3 lavoratori e 2 server dei parametri, il "TF_CONFIG" del lavoratore 1 può essere:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

Il "TF_CONFIG" del valutatore può essere:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

La parte "cluster" nella stringa "TF_CONFIG" sopra per il valutatore è facoltativa.

Se usi lo stesso binario per tutte le attività

Se preferisci eseguire tutte queste attività utilizzando un singolo binario, all'inizio dovrai lasciare che il tuo programma si ramifichi in ruoli diversi:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Il codice seguente avvia un server TensorFlow e attende:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Gestione del fallimento dell'attività

Fallimento del lavoratore

tf.distribute.experimental.coordinator.ClusterCoordinator o Model.fit forniscono una tolleranza agli errori incorporata per il fallimento del lavoratore. Al momento del ripristino del lavoratore, la funzione del set di dati fornita in precedenza (a ClusterCoordinator.create_per_worker_dataset per un ciclo di addestramento personalizzato o tf.keras.utils.experimental.DatasetCreator per Model.fit ) verrà richiamata sui lavoratori per ricreare i set di dati.

Errore del server dei parametri o del coordinatore

Tuttavia, quando il coordinatore vede un errore del server dei parametri, solleverà immediatamente un UnavailableError o AbortedError . In questo caso puoi riavviare il coordinatore. Anche il coordinatore stesso può diventare non disponibile. Pertanto, si consiglia di utilizzare alcuni strumenti per non perdere i progressi della formazione:

  • Per Model.fit , dovresti usare una richiamata BackupAndRestore , che gestisce automaticamente il salvataggio e il ripristino dell'avanzamento. Per un esempio, vedere la sezione Richiamate e formazione sopra.

  • Per un ciclo di addestramento personalizzato, è necessario controllare periodicamente le variabili del modello e caricare le variabili del modello da un punto di controllo, se presente, prima dell'inizio dell'addestramento. L'avanzamento dell'addestramento può essere dedotto approssimativamente optimizer.iterations se viene eseguito un checkpoint di un ottimizzatore:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Recupero di un RemoteValue

Il recupero di un RemoteValue è garantito se una funzione viene eseguita correttamente. Questo perché attualmente il valore restituito viene immediatamente copiato nel coordinatore dopo l'esecuzione di una funzione. Se si verifica un errore di lavoro durante la copia, la funzione verrà ritentata su un altro lavoratore disponibile. Pertanto, se si desidera ottimizzare le prestazioni, è possibile pianificare le funzioni senza un valore restituito.

Segnalazione errori

Una volta che il coordinatore vede un errore come UnavailableError dai server dei parametri o altri errori dell'applicazione come InvalidArgument da tf.debugging.check_numerics , cancellerà tutte le funzioni in sospeso e in coda prima di generare l'errore. Recuperare i loro RemoteValue corrispondenti solleverà un CancelledError .

Dopo che è stato segnalato un errore, il coordinatore non solleverà lo stesso errore o qualsiasi errore dalle funzioni annullate.

Miglioramento delle prestazioni

Esistono diversi possibili motivi se si verificano problemi di prestazioni durante l'allenamento con ParameterServerStrategy e ClusterResolver .

Un motivo comune è che i server dei parametri hanno un carico sbilanciato e alcuni server dei parametri molto caricati hanno raggiunto la capacità. Possono esserci anche più cause alla radice. Alcuni semplici metodi per mitigare questo problema sono:

  1. Shard le tue variabili di modello di grandi dimensioni specificando un variable_partitioner durante la costruzione di un ParameterServerStrategy .
  2. Se possibile, evitare di creare una variabile hotspot richiesta da tutti i server dei parametri in un unico passaggio. Ad esempio, utilizzare un tasso di apprendimento costante o una sottoclasse tf.keras.optimizers.schedules.LearningRateSchedule negli ottimizzatori poiché il comportamento predefinito è che il tasso di apprendimento diventerà una variabile posizionata su un particolare server di parametri e richiesta da tutti gli altri server di parametri in ogni passaggio .
  3. Mescola i tuoi grandi vocabolari prima di passarli ai livelli di preelaborazione di Keras.

Un'altra possibile ragione per problemi di prestazioni è il coordinatore. La tua prima implementazione di schedule / join è basata su Python e quindi potrebbe avere un sovraccarico di threading. Anche la latenza tra il coordinatore e i lavoratori può essere grande. Se questo è il caso,

  • Per Model.fit , puoi impostare l'argomento steps_per_execution fornito in Model.compile su un valore maggiore di 1.

  • Per un ciclo di formazione personalizzato, puoi raggruppare più passaggi in un unico tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Poiché la libreria è ulteriormente ottimizzata, si spera che la maggior parte degli utenti non debba impacchettare manualmente i passaggi in futuro.

Inoltre, un piccolo trucco per il miglioramento delle prestazioni consiste nel pianificare le funzioni senza un valore restituito, come spiegato nella sezione precedente relativa alla gestione degli errori dell'attività.

Limiti noti

La maggior parte delle limitazioni note sono già trattate nelle sezioni precedenti. Questa sezione fornisce un riepilogo.

ParameterServerStrategy generale

  • os.environment["grpc_fail_fast"]="use_caller" è necessario in ogni attività, incluso il coordinatore, per far funzionare correttamente la tolleranza agli errori.
  • L'addestramento sincrono del server dei parametri non è supportato.
  • Di solito è necessario raggruppare più passaggi in un'unica funzione per ottenere prestazioni ottimali.
  • Non è supportato caricare un modello_salvato tramite tf.saved_model.load contenente variabili partizionate. Nota che il caricamento di un tale modello_salvato utilizzando TensorFlow Serving dovrebbe funzionare.
  • Non è supportato caricare un checkpoint contenente variabili di slot dell'ottimizzatore partizionato in un numero diverso di partizioni.
  • Non è supportato il ripristino da un errore del server dei parametri senza riavviare l'attività del coordinatore.
  • L'utilizzo di tf.lookup.StaticHashTable (che è comunemente impiegato da alcuni livelli di preelaborazione Keras, come tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup e tf.keras.layers.TextVectorization ) determina risorse posizionate su il coordinatore in questo momento con la formazione del server dei parametri. Ciò ha implicazioni sulle prestazioni per la ricerca degli RPC dai lavoratori al coordinatore. Questa è un'alta priorità attuale da affrontare.

Specifiche Model.fit

  • L'argomento steps_per_epoch è richiesto in Model.fit . È possibile selezionare un valore che fornisca intervalli appropriati in un'epoca.
  • ParameterServerStrategy non supporta i callback personalizzati con chiamate a livello di batch per motivi di prestazioni. Dovresti convertire quelle chiamate in chiamate a livello di epoca con steps_per_epoch opportunamente selezionati, in modo che siano chiamate ogni numero di steps_per_epoch di passi. I callback incorporati non sono interessati: le loro chiamate a livello di batch sono state modificate per essere performanti. È in programma il supporto delle chiamate a livello di batch per ParameterServerStrategy .
  • Per lo stesso motivo, a differenza di altre strategie, la barra di avanzamento e le metriche vengono registrate solo ai limiti dell'epoca.
  • run_eagerly non è supportato.

Specifiche del ciclo di formazione personalizzato