Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Questa guida mostra come migrare il flusso di lavoro di formazione distribuito multi-lavoratore da TensorFlow 1 a TensorFlow 2.
Per eseguire la formazione multi-operatore con CPU/GPU:
- In TensorFlow 1, tradizionalmente utilizzi le API
tf.estimator.train_and_evaluate
etf.estimator.Estimator
. - In TensorFlow 2, usa le API Keras per scrivere il modello, la funzione di perdita, l'ottimizzatore e le metriche. Quindi, distribuisci la formazione con l'API Keras
Model.fit
o un ciclo di formazione personalizzato (contf.GradientTape
) tra più lavoratori contf.distribute.experimental.ParameterServerStrategy
otf.distribute.MultiWorkerMirroredStrategy
. Per maggiori dettagli, fare riferimento ai seguenti tutorial:
Impostare
Inizia con alcune importazioni necessarie e un semplice set di dati a scopo dimostrativo:
# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]
Avrai bisogno della variabile di ambiente di configurazione 'TF_CONFIG'
per l'addestramento su più macchine in TensorFlow. Utilizzare 'TF_CONFIG'
per specificare gli indirizzi 'cluster'
e 'task'
. (Ulteriori informazioni nella guida alla formazione distribuita .)
import json
import os
tf_config = {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Usa l'istruzione del
per rimuovere la variabile (ma nella formazione multi-lavoratore nel mondo reale in TensorFlow 1, non dovrai farlo):
del os.environ['TF_CONFIG']
TensorFlow 1: formazione distribuita multi-lavoratore con API tf.estimator
Il seguente frammento di codice mostra il flusso di lavoro canonico della formazione multi-lavoratore in TF1: utilizzerai tf.estimator.Estimator
, tf.estimator.TrainSpec
, tf.estimator.EvalSpec
e l'API tf.estimator.train_and_evaluate
per distribuire l'allenamento:
def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)
def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)
def _model_fn(features, labels, mode):
logits = tf1.layers.Dense(1)(features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config. WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5 INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version. Instructions for updating: Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts. INFO:tensorflow:Calling model_fn. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version. Instructions for updating: Call initializer instance with the dtype argument instead of passing it to the constructor INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:loss = 0.038075272, step = 0 INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3... INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Inference Time : 0.13630s INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06 INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3 INFO:tensorflow:Loss for final step: 0.061832994. ({'loss': 0.005215075, 'global_step': 3}, [])
TensorFlow 2: formazione multi-lavoratore con strategie di distribuzione
In TensorFlow 2, la formazione distribuita tra più lavoratori con CPU, GPU e TPU viene eseguita tramite tf.distribute.Strategy
s.
L'esempio seguente mostra come usare due di queste strategie: tf.distribute.experimental.ParameterServerStrategy
e tf.distribute.MultiWorkerMirroredStrategy
, entrambe progettate per l'addestramento di CPU/GPU con più lavoratori.
ParameterServerStrategy
impiega un coordinatore ( 'chief'
), che lo rende più compatibile con l'ambiente in questo notebook Colab. Utilizzerai qui alcune utilità per impostare gli elementi di supporto essenziali per un'esperienza eseguibile qui: creerai un cluster in-process , in cui i thread vengono utilizzati per simulare i server dei parametri ( 'ps'
) e i lavoratori ( 'worker'
) . Per ulteriori informazioni sull'addestramento del server dei parametri, fare riferimento all'esercitazione sul server dei parametri con ParameterServerStrategy .
In questo esempio, definire prima la variabile di ambiente 'TF_CONFIG'
con un tf.distribute.cluster_resolver.TFConfigClusterResolver
per fornire le informazioni sul cluster. Se stai utilizzando un sistema di gestione del cluster per la tua formazione distribuita, controlla se fornisce 'TF_CONFIG'
per te, nel qual caso non è necessario impostare esplicitamente questa variabile di ambiente. (Ulteriori informazioni nella sezione Configurazione della variabile di ambiente 'TF_CONFIG'
nella guida Distributed training with TensorFlow .)
# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]
# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps': ["localhost:%s" % port for port in ps_ports],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
Quindi, crea uno per uno tf.distribute.Server
per i worker e i server dei parametri:
# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4
for i in range(3):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="worker",
task_index=i,
config=worker_config)
for i in range(2):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="ps",
task_index=i)
Nella formazione distribuita nel mondo reale, invece di avviare tutti i tf.distribute.Server
sul coordinatore, utilizzerai più macchine e quelle designate come "worker"
e "ps"
(server dei parametri) eseguire un tf.distribute.Server
. Per ulteriori dettagli, fare riferimento alla sezione Cluster nel mondo reale nell'esercitazione di addestramento del server dei parametri .
Con tutto pronto, crea l'oggetto ParameterServerStrategy
:
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Dopo aver creato un oggetto strategia, definire il modello, l'ottimizzatore e altre variabili e chiamare Keras Model.compile
all'interno dell'API Strategy.scope
per distribuire il training. (Per ulteriori informazioni, fare riferimento ai documenti dell'API Strategy.scope
.)
Se si preferisce personalizzare l'allenamento definendo, ad esempio, i passaggi in avanti e all'indietro, fare riferimento alla sezione Formazione con un ciclo di addestramento personalizzato nell'esercitazione di addestramento del server dei parametri per maggiori dettagli.
dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)
eval_dataset = tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
model.compile(optimizer, "mse")
model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step Epoch 2/5 10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step Epoch 3/5 10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step Epoch 4/5 10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step Epoch 5/5 10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step <keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114 2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 10/10 [==============================] - 2s 38ms/step - loss: 3.8431 2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } {'loss': 3.843122}
Partizionatori (
tf.distribute.experimental.partitioners
)
ParameterServerStrategy
in TensorFlow 2 supporta il partizionamento delle variabili e offre gli stessi partizionatori di TensorFlow 1, con nomi meno confusi: -tf.compat.v1.variable_axis_size_partitioner
->tf.distribute.experimental.partitioners.MaxSizePartitioner
: un partizionatore che mantiene gli shard sotto una dimensione massima) . -tf.compat.v1.min_max_variable_partitioner
->tf.distribute.experimental.partitioners.MinSizePartitioner
: un partizionatore che alloca una dimensione minima per shard. -tf.compat.v1.fixed_size_partitioner
->tf.distribute.experimental.partitioners.FixedShardsPartitioner
: un partizionatore che alloca un numero fisso di shard.
In alternativa, puoi utilizzare un oggetto MultiWorkerMirroredStrategy
:
# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
È possibile sostituire la strategia utilizzata sopra con un oggetto MultiWorkerMirroredStrategy
per eseguire l'addestramento con questa strategia.
Come con le API tf.estimator
, poiché MultiWorkerMirroredStrategy
è una strategia multi-client, non esiste un modo semplice per eseguire la formazione distribuita in questo notebook Colab. Pertanto, la sostituzione del codice sopra con questa strategia finisce per eseguire le cose localmente. I tutorial di formazione multi-lavoratore con Keras Model.fit/a ciclo di formazione personalizzato mostrano come eseguire la formazione multi-lavoratore con la variabile 'TF_CONFIG'
impostata, con due lavoratori su un localhost in Colab. In pratica, creeresti più lavoratori su indirizzi/porte IP esterni e utilizzeresti la variabile 'TF_CONFIG'
per specificare la configurazione del cluster per ogni lavoratore.
Prossimi passi
Per ulteriori informazioni sulla formazione distribuita multi-lavoratore con tf.distribute.experimental.ParameterServerStrategy
e tf.distribute.MultiWorkerMirroredStrategy
in TensorFlow 2, considera le seguenti risorse:
- Esercitazione: training del server dei parametri con ParameterServerStrategy e Keras Model.fit/a ciclo di training personalizzato
- Tutorial: formazione multi-lavoratore con MultiWorkerMirroredStrategy e Keras Model.fit
- Tutorial: formazione multi-lavoratore con MultiWorkerMirroredStrategy e un ciclo di formazione personalizzato
- Guida: formazione distribuita con TensorFlow
- Guida: ottimizza le prestazioni della GPU TensorFlow con TensorFlow Profiler
- Guida: utilizzare una GPU (la sezione Utilizzo di più GPU)