Xem trên TensorFlow.org | Chạy trong Google Colab | Xem nguồn trên GitHub | Tải xuống sổ ghi chép |
Tổng quat
Hướng dẫn này trình bày cách sử dụng tf.distribute.Strategy
để đào tạo nhiều nhân viên phân tán với tf.estimator
. Nếu bạn viết mã của mình bằng tf.estimator
và bạn quan tâm đến việc mở rộng ra ngoài một máy duy nhất với hiệu suất cao, thì hướng dẫn này là dành cho bạn.
Trước khi bắt đầu, vui lòng đọc hướng dẫn chiến lược phân phối . Hướng dẫn đào tạo đa GPU cũng có liên quan, vì hướng dẫn này sử dụng cùng một mô hình.
Thành lập
Đầu tiên, thiết lập TensorFlow và các lần nhập cần thiết.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
Chức năng đầu vào
Hướng dẫn này sử dụng tập dữ liệu MNIST từ TensorFlow Datasets . Đoạn mã ở đây tương tự như hướng dẫn đào tạo đa GPU với một điểm khác biệt chính: khi sử dụng Công cụ ước tính để đào tạo nhiều nhân viên, cần phải chia nhỏ tập dữ liệu theo số lượng nhân viên để đảm bảo mô hình hội tụ. Dữ liệu đầu vào được phân đoạn theo chỉ mục worker, để mỗi worker xử lý 1/num_workers
các phần riêng biệt của tập dữ liệu.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
Một cách tiếp cận hợp lý khác để đạt được sự hội tụ là xáo trộn tập dữ liệu với các hạt giống riêng biệt ở mỗi công nhân.
Cấu hình nhiều nhân viên
Một trong những điểm khác biệt chính trong hướng dẫn này (so với hướng dẫn đào tạo đa GPU ) là thiết lập nhiều nhân viên. Biến môi trường TF_CONFIG
là cách tiêu chuẩn để chỉ định cấu hình cụm cho mỗi worker là một phần của cụm.
Có hai thành phần của TF_CONFIG
: cluster
và task
. cluster
cung cấp thông tin về toàn bộ cluster, cụ thể là các worker và các máy chủ tham số trong cluster. task
cung cấp thông tin về nhiệm vụ hiện tại. cluster
thành phần đầu tiên giống nhau đối với tất cả công nhân và máy chủ tham số trong cụm và task
thành phần thứ hai khác nhau trên mỗi công nhân và máy chủ tham số và chỉ định type
và index
riêng của nó. Trong ví dụ này, type
tác vụ là worker
và index
tác vụ là 0
.
Với mục đích minh họa, hướng dẫn này chỉ ra cách đặt TF_CONFIG
với 2 worker trên localhost
. Trong thực tế, bạn sẽ tạo nhiều công nhân trên một địa chỉ IP và cổng bên ngoài, đồng thời đặt TF_CONFIG
trên mỗi công nhân một cách thích hợp, tức là sửa đổi index
nhiệm vụ.
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
Xác định mô hình
Viết các lớp, trình tối ưu hóa và hàm mất để đào tạo. Hướng dẫn này xác định mô hình bằng các lớp Keras, tương tự như hướng dẫn đào tạo đa GPU .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredStrategy
Để đào tạo mô hình, hãy sử dụng một phiên bản của tf.distribute.experimental.MultiWorkerMirroredStrategy
. MultiWorkerMirroredStrategy
tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các công nhân. Nó sử dụng CollectiveOps
, một tùy chọn TensorFlow để giao tiếp tập thể, để tổng hợp các gradient và giữ cho các biến được đồng bộ hóa. Hướng dẫn tf.distribute.Strategy
có thêm chi tiết về chiến lược này.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
Đào tạo và đánh giá mô hình
Tiếp theo, chỉ định chiến lược phân phối trong RunConfig
cho công cụ ước tính, đào tạo và đánh giá bằng cách gọi tf.estimator.train_and_evaluate
. Hướng dẫn này chỉ phân phối đào tạo bằng cách chỉ định chiến lược qua train_distribute
. Cũng có thể phân phối đánh giá thông qua eval_distribute
.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
Tối ưu hóa hiệu suất đào tạo
Bây giờ bạn có một mô hình và một Công cụ ước tính có nhiều nhân viên được hỗ trợ bởi tf.distribute.Strategy
. Bạn có thể thử các kỹ thuật sau để tối ưu hóa hiệu suất của đào tạo nhiều nhân viên:
- Tăng kích thước lô: Kích thước lô được chỉ định ở đây là trên mỗi GPU. Nói chung, kích thước lô lớn nhất phù hợp với bộ nhớ GPU được khuyến khích.
- Truyền biến: Truyền các biến tới
tf.float
nếu có thể. Mô hình ResNet chính thức bao gồm một ví dụ về cách có thể thực hiện điều này. Sử dụng giao tiếp tập thể:
MultiWorkerMirroredStrategy
cung cấp nhiều triển khai giao tiếp tập thể .-
RING
triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ. -
NCCL
sử dụng NCCL của Nvidia để thực hiện các tập thể. -
AUTO
định nghĩa sự lựa chọn trong thời gian chạy.
Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm. Để ghi đè lựa chọn tự động, hãy chỉ định một giá trị hợp lệ cho tham số
communication
của phương thứcMultiWorkerMirroredStrategy
, ví dụ:communication=tf.distribute.experimental.CollectiveCommunication.NCCL
.-
Truy cập phần Hiệu suất trong hướng dẫn để tìm hiểu thêm về các chiến lược và công cụ khác mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.
Các ví dụ mã khác
- Ví dụ từ đầu đến cuối để đào tạo nhiều nhân viên trong tensorflow / hệ sinh thái bằng cách sử dụng các mẫu Kubernetes. Ví dụ này bắt đầu với mô hình Keras và chuyển đổi nó thành Công cụ ước tính bằng cách sử dụng API
tf.keras.estimator.model_to_estimator
. - Các mô hình chính thức , nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.