Veja no TensorFlow.org | Executar no Google Colab | Ver fonte no GitHub | Baixar caderno |
Este guia demonstra como migrar seu fluxo de trabalho de treinamento distribuído para vários funcionários do TensorFlow 1 para o TensorFlow 2.
Para realizar o treinamento de vários trabalhadores com CPUs/GPUs:
- No TensorFlow 1, você tradicionalmente usa as APIs
tf.estimator.train_and_evaluate
etf.estimator.Estimator
. - No TensorFlow 2, use as APIs Keras para escrever o modelo, a função de perda, o otimizador e as métricas. Em seguida, distribua o treinamento com a API Keras
Model.fit
ou um loop de treinamento personalizado (comtf.GradientTape
) entre vários trabalhadores comtf.distribute.experimental.ParameterServerStrategy
outf.distribute.MultiWorkerMirroredStrategy
. Para obter mais detalhes, consulte os seguintes tutoriais:
Configurar
Comece com algumas importações necessárias e um conjunto de dados simples para fins de demonstração:
# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker
import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]
Você precisará da variável de ambiente de configuração 'TF_CONFIG'
para treinar em várias máquinas no TensorFlow. Use 'TF_CONFIG'
para especificar os endereços 'cluster'
e 'task'
s'. (Saiba mais no guia Distributed_training .)
import json
import os
tf_config = {
'cluster': {
'chief': ['localhost:11111'],
'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
'ps': ['localhost:12121', 'localhost:13131'],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Use a instrução del
para remover a variável (mas no treinamento de vários trabalhadores do mundo real no TensorFlow 1, você não precisará fazer isso):
del os.environ['TF_CONFIG']
TensorFlow 1: treinamento distribuído para vários trabalhadores com APIs tf.estimator
O snippet de código a seguir demonstra o fluxo de trabalho canônico do treinamento de vários trabalhadores no TF1: você usará um tf.estimator.Estimator
, um tf.estimator.TrainSpec
, um tf.estimator.EvalSpec
e a API tf.estimator.train_and_evaluate
para distribuir o treinamento:
def _input_fn():
return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)
def _eval_input_fn():
return tf1.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).batch(1)
def _model_fn(features, labels, mode):
logits = tf1.layers.Dense(1)(features)
loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
optimizer = tf1.train.AdagradOptimizer(0.05)
train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config. WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5 INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version. Instructions for updating: Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts. INFO:tensorflow:Calling model_fn. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version. Instructions for updating: Call initializer instance with the dtype argument instead of passing it to the constructor INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:loss = 0.038075272, step = 0 INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3... INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Inference Time : 0.13630s INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06 INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3 INFO:tensorflow:Loss for final step: 0.061832994. ({'loss': 0.005215075, 'global_step': 3}, [])
TensorFlow 2: treinamento para vários trabalhadores com estratégias de distribuição
No TensorFlow 2, o treinamento distribuído entre vários trabalhadores com CPUs, GPUs e TPUs é feito por meio tf.distribute.Strategy
s.
O exemplo a seguir demonstra como usar duas dessas estratégias: tf.distribute.experimental.ParameterServerStrategy
e tf.distribute.MultiWorkerMirroredStrategy
, ambas projetadas para treinamento de CPU/GPU com vários trabalhadores.
O ParameterServerStrategy
emprega um coordenador ( 'chief'
), o que o torna mais amigável com o ambiente neste notebook Colab. Você usará alguns utilitários aqui para configurar os elementos de suporte essenciais para uma experiência executável aqui: você criará um cluster em processo , onde as threads são usadas para simular os servidores de parâmetro ( 'ps'
) e os trabalhadores ( 'worker'
) . Para obter mais informações sobre o treinamento do servidor de parâmetros, consulte o tutorial Treinamento do servidor de parâmetros com ParameterServerStrategy .
Neste exemplo, primeiro defina a variável de ambiente 'TF_CONFIG'
com um tf.distribute.cluster_resolver.TFConfigClusterResolver
para fornecer as informações do cluster. Se você estiver usando um sistema de gerenciamento de cluster para seu treinamento distribuído, verifique se ele já fornece 'TF_CONFIG'
para você, nesse caso você não precisa definir explicitamente essa variável de ambiente. (Saiba mais na seção Configurando a variável de ambiente 'TF_CONFIG'
no guia Treinamento distribuído com TensorFlow .)
# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker
chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]
# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
'cluster': {
'chief': ["localhost:%s" % chief_port],
'worker': ["localhost:%s" % port for port in worker_ports],
'ps': ["localhost:%s" % port for port in ps_ports],
},
'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)
# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
Em seguida, crie tf.distribute.Server
s para os workers e os servidores de parâmetros um por um:
# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4
for i in range(3):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="worker",
task_index=i,
config=worker_config)
for i in range(2):
tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name="ps",
task_index=i)
No treinamento distribuído do mundo real, em vez de iniciar todos os tf.distribute.Server
s no coordenador, você usará várias máquinas, e as designadas como "worker"
s e "ps"
(servidores de parâmetros) serão cada uma execute um tf.distribute.Server
. Consulte a seção Clusters no mundo real no tutorial de treinamento do servidor de parâmetros para obter mais detalhes.
Com tudo pronto, crie o objeto ParameterServerStrategy
:
strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Depois de criar um objeto de estratégia, defina o modelo, o otimizador e outras variáveis e chame o Keras Model.compile
dentro da API Strategy.scope
para distribuir o treinamento. (Consulte os documentos da API Strategy.scope
para obter mais informações.)
Se você preferir personalizar seu treinamento, por exemplo, definindo os passos para frente e para trás, consulte a seção Treinamento com um loop de treinamento personalizado no tutorial de treinamento do servidor de parâmetros para obter mais detalhes.
dataset = tf.data.Dataset.from_tensor_slices(
(features, labels)).shuffle(10).repeat().batch(64)
eval_dataset = tf.data.Dataset.from_tensor_slices(
(eval_features, eval_labels)).repeat().batch(1)
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
model.compile(optimizer, "mse")
model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step Epoch 2/5 10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step Epoch 3/5 10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step Epoch 4/5 10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step Epoch 5/5 10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step <keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114 2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } 10/10 [==============================] - 2s 38ms/step - loss: 3.8431 2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 3 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:8" } } attr { key: "output_shapes" value { list { shape { dim { size: 2 } } shape { dim { size: 1 } } } } } {'loss': 3.843122}
Particionadores (
tf.distribute.experimental.partitioners
)O
ParameterServerStrategy
no TensorFlow 2 oferece suporte ao particionamento variável e oferece os mesmos particionadores do TensorFlow 1, com nomes menos confusos: -tf.compat.v1.variable_axis_size_partitioner
->tf.distribute.experimental.partitioners.MaxSizePartitioner
: um particionador que mantém os fragmentos em um tamanho máximo) . -tf.compat.v1.min_max_variable_partitioner
->tf.distribute.experimental.partitioners.MinSizePartitioner
: um particionador que aloca um tamanho mínimo por shard. -tf.compat.v1.fixed_size_partitioner
->tf.distribute.experimental.partitioners.FixedShardsPartitioner
: um particionador que aloca um número fixo de shards.
Como alternativa, você pode usar um objeto MultiWorkerMirroredStrategy
:
# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
Você pode substituir a estratégia usada acima por um objeto MultiWorkerMirroredStrategy
para realizar o treinamento com essa estratégia.
Assim como as APIs tf.estimator
, como o MultiWorkerMirroredStrategy
é uma estratégia multicliente, não há uma maneira fácil de executar treinamento distribuído neste notebook Colab. Portanto, substituir o código acima por essa estratégia acaba executando as coisas localmente. Os tutoriais de treinamento de vários trabalhadores com Keras Model.fit / um loop de treinamento personalizado demonstram como executar o treinamento de vários trabalhadores com a configuração da variável 'TF_CONFIG'
, com dois trabalhadores em um host local no Colab. Na prática, você criaria vários trabalhadores em endereços/portas IP externos e usaria a variável 'TF_CONFIG'
para especificar a configuração do cluster para cada trabalhador.
Próximos passos
Para saber mais sobre o treinamento distribuído de vários trabalhadores com tf.distribute.experimental.ParameterServerStrategy
e tf.distribute.MultiWorkerMirroredStrategy
no TensorFlow 2, considere os seguintes recursos:
- Tutorial: Treinamento do servidor de parâmetros com ParameterServerStrategy e Keras Model.fit/um loop de treinamento personalizado
- Tutorial: Treinamento de vários trabalhadores com MultiWorkerMirroredStrategy e Keras Model.fit
- Tutorial: Treinamento de vários trabalhadores com MultiWorkerMirroredStrategy e um loop de treinamento personalizado
- Guia: treinamento distribuído com TensorFlow
- Guia: Otimize o desempenho da GPU do TensorFlow com o TensorFlow Profiler
- Guia: Use uma GPU (a seção Usando várias GPUs)