Xem trên TensorFlow.org | Chạy trong Google Colab | Xem nguồn trên GitHub | Tải xuống sổ ghi chép |
Tổng quat
Hướng dẫn này trình bày cách thực hiện đào tạo phân tán nhiều nhân viên với mô hình Keras và API Model.fit
bằng cách sử dụng API tf.distribute.Strategy
— cụ thể là lớp tf.distribute.MultiWorkerMirroredStrategy
. Với sự trợ giúp của chiến lược này, mô hình Keras được thiết kế để chạy trên một nhân viên duy nhất có thể làm việc liền mạch trên nhiều nhân viên với những thay đổi mã tối thiểu.
Đối với những người quan tâm đến việc hiểu sâu hơn về các API tf.distribute.Strategy
, chương trình đào tạo Phân tán trong hướng dẫn TensorFlow có sẵn để biết tổng quan về các chiến lược phân phối mà TensorFlow hỗ trợ.
Để tìm hiểu cách sử dụng MultiWorkerMirroredStrategy
với Keras và vòng đào tạo tùy chỉnh, hãy tham khảo Vòng đào tạo tùy chỉnh với Keras và MultiWorkerMirroredStrategy .
Lưu ý rằng mục đích của hướng dẫn này là để chứng minh một ví dụ tối thiểu về nhiều công nhân với hai công nhân.
Thành lập
Bắt đầu với một số nhập cần thiết:
import json
import os
import sys
Trước khi nhập TensorFlow, hãy thực hiện một số thay đổi đối với môi trường:
- Tắt tất cả các GPU. Điều này ngăn ngừa các lỗi gây ra bởi tất cả các công nhân đang cố gắng sử dụng cùng một GPU. Trong một ứng dụng thế giới thực, mỗi công nhân sẽ làm việc trên một máy khác nhau.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- Đặt lại biến môi trường
TF_CONFIG
(bạn sẽ tìm hiểu thêm về điều này sau):
os.environ.pop('TF_CONFIG', None)
- Đảm bảo rằng thư mục hiện tại nằm trên đường dẫn của Python — điều này cho phép sổ ghi chép nhập các tệp được viết bởi
%%writefile
sau:
if '.' not in sys.path:
sys.path.insert(0, '.')
Bây giờ nhập TensorFlow:
import tensorflow as tf
Tập dữ liệu và định nghĩa mô hình
Tiếp theo, tạo tệp mnist_setup.py
với thiết lập mô hình và tập dữ liệu đơn giản. Tệp Python này sẽ được sử dụng bởi các quy trình công nhân trong hướng dẫn này:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
Đào tạo mô hình trên một công nhân duy nhất
Hãy thử đào tạo mô hình cho một số lượng nhỏ kỷ nguyên và quan sát kết quả của một công nhân để đảm bảo mọi thứ hoạt động chính xác. Khi quá trình đào tạo tiến triển, tổn thất sẽ giảm xuống và độ chính xác sẽ tăng lên.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
Cấu hình nhiều nhân viên
Bây giờ chúng ta hãy bước vào thế giới của đào tạo nhiều nhân viên.
Một cụm với các công việc và nhiệm vụ
Trong TensorFlow, đào tạo phân tán bao gồm: một 'cluster'
với một số công việc và mỗi công việc có thể có một hoặc nhiều 'task'
.
Bạn sẽ cần biến môi trường cấu hình TF_CONFIG
để đào tạo trên nhiều máy, mỗi máy có thể có một vai trò khác nhau. TF_CONFIG
là một chuỗi JSON được sử dụng để chỉ định cấu hình cụm cho mỗi worker là một phần của cụm.
Có hai thành phần của một biến TF_CONFIG
: 'cluster'
và 'task'
.
Một
'cluster'
giống nhau đối với tất cả công nhân và cung cấp thông tin về nhóm đào tạo, là một mệnh lệnh bao gồm các loại công việc khác nhau, chẳng hạn như'worker'
hoặc'chief'
.- Trong đào tạo nhiều nhân viên với
tf.distribute.MultiWorkerMirroredStrategy
, thường có một'worker'
đảm nhận các trách nhiệm, chẳng hạn như lưu điểm kiểm tra và viết tệp tóm tắt cho TensorBoard, ngoài những việc mà một'worker'
thông thường làm.'worker'
đó được gọi là công nhân trưởng (với tên công việc là'chief'
). - Theo thông lệ,
'chief'
phải chỉ định'index'
0
(trên thực tế, đây là cách thực hiệntf.distribute.Strategy
).
- Trong đào tạo nhiều nhân viên với
Một
'task'
cung cấp thông tin về nhiệm vụ hiện tại và khác nhau đối với từng công nhân. Nó chỉ định'type'
và'index'
của công nhân đó.
Dưới đây là một cấu hình ví dụ:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Đây là cùng một TF_CONFIG
được tuần tự hóa thành một chuỗi JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Lưu ý rằng tf_config
chỉ là một biến cục bộ trong Python. Để có thể sử dụng nó cho cấu hình đào tạo, lệnh này cần được tuần tự hóa dưới dạng JSON và được đặt trong một biến môi trường TF_CONFIG
.
Trong cấu hình ví dụ ở trên, bạn đặt nhiệm vụ 'type'
thành 'worker'
và 'index'
nhiệm vụ thành 0
. Do đó, chiếc máy này là công nhân đầu tiên . Nó sẽ được bổ nhiệm làm công nhân 'chief'
và làm nhiều việc hơn những người khác.
Với mục đích minh họa, hướng dẫn này chỉ ra cách bạn có thể thiết lập một biến TF_CONFIG
với hai công nhân trên một localhost
.
Trong thực tế, bạn sẽ tạo nhiều công nhân trên các địa chỉ / cổng IP bên ngoài và đặt biến TF_CONFIG
trên mỗi công nhân tương ứng.
Trong hướng dẫn này, bạn sẽ sử dụng hai công nhân:
-
TF_CONFIG
của công nhân đầu tiên ('chief'
) được hiển thị ở trên. - Đối với công nhân thứ hai, bạn sẽ đặt
tf_config['task']['index']=1
Biến môi trường và quy trình con trong sổ ghi chép
Các quy trình con kế thừa các biến môi trường từ cha của chúng.
Ví dụ: bạn có thể đặt một biến môi trường trong quy trình Máy tính xách tay Jupyter này như sau:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Sau đó, bạn có thể truy cập biến môi trường từ một quy trình con:
echo ${GREETINGS}
Hello TensorFlow!
Trong phần tiếp theo, bạn sẽ sử dụng một phương pháp tương tự để chuyển TF_CONFIG
đến các quy trình con của worker. Trong một tình huống thực tế, bạn sẽ không bắt đầu công việc của mình theo cách này, nhưng trong ví dụ này là đủ.
Chọn chiến lược phù hợp
Trong TensorFlow, có hai hình thức đào tạo phân tán chính:
- Đào tạo đồng bộ , trong đó các bước đào tạo được đồng bộ hóa giữa các công nhân và bản sao, và
- Đào tạo không đồng bộ , trong đó các bước đào tạo không được đồng bộ hóa nghiêm ngặt (ví dụ: đào tạo máy chủ tham số ).
Hướng dẫn này trình bày cách thực hiện đào tạo nhiều nhân viên đồng bộ bằng cách sử dụng một phiên bản của tf.distribute.MultiWorkerMirroredStrategy
.
MultiWorkerMirroredStrategy
tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các công nhân. Nó sử dụng CollectiveOps
, một tùy chọn TensorFlow để giao tiếp tập thể, để tổng hợp các gradient và giữ cho các biến được đồng bộ hóa. Hướng dẫn tf.distribute.Strategy
có thêm chi tiết về chiến lược này.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
cung cấp nhiều triển khai thông qua tham số tf.distribute.experimental.CommunicationOptions
: 1) RING
triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp trên nhiều máy chủ; 2) NCCL
sử dụng Thư viện liên lạc tập thể NVIDIA để thực hiện tập thể; và 3) AUTO
định nghĩa sự lựa chọn trong thời gian chạy. Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng và loại GPU cũng như kết nối mạng trong cụm.
Đào tạo mô hình
Với việc tích hợp API tf.distribute.Strategy
vào tf.keras
, thay đổi duy nhất bạn sẽ thực hiện để phân phối đào tạo cho nhiều nhân viên là bao gồm việc xây dựng mô hình và cuộc gọi model.compile()
bên trong strategy.scope()
. Phạm vi của chiến lược phân phối quy định cách thức và vị trí các biến được tạo, và trong trường hợp của MultiWorkerMirroredStrategy
, các biến được tạo là MirroredVariable
s và chúng được sao chép trên từng công nhân.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
Để thực sự chạy với MultiWorkerMirroredStrategy
, bạn sẽ cần chạy các quy trình công nhân và chuyển TF_CONFIG
cho chúng.
Giống như tệp mnist_setup.py
được viết trước đó, đây là main.py
mà mỗi công nhân sẽ chạy:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
Trong đoạn mã trên, lưu ý rằng global_batch_size
, được chuyển đến Dataset.batch
, được đặt thành per_worker_batch_size * num_workers
. Điều này đảm bảo rằng mỗi công nhân xử lý hàng loạt ví dụ per_worker_batch_size
bất kể số lượng công nhân.
Thư mục hiện tại hiện chứa cả hai tệp Python:
ls *.py
main.py mnist_setup.py
Vì vậy, json-serialize TF_CONFIG
và thêm nó vào các biến môi trường:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Bây giờ, bạn có thể khởi chạy một quy trình công nhân sẽ chạy main.py
và sử dụng TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Có một số điều cần lưu ý về lệnh trên:
- Nó sử dụng
%%bash
là một "phép thuật" sổ ghi chép để chạy một số lệnh bash. - Nó sử dụng cờ
--bg
để chạy quá trìnhbash
trong nền, bởi vì công nhân này sẽ không kết thúc. Nó đợi tất cả các công nhân trước khi nó bắt đầu.
Quá trình công nhân nền sẽ không in kết quả ra sổ ghi chép này, vì vậy &>
chuyển hướng đầu ra của nó thành một tệp để bạn có thể kiểm tra những gì đã xảy ra trong tệp nhật ký sau này.
Vì vậy, hãy đợi vài giây để quá trình bắt đầu:
import time
time.sleep(10)
Bây giờ, hãy kiểm tra những gì đã được xuất vào tệp nhật ký của worker cho đến nay:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Dòng cuối cùng của tệp nhật ký sẽ có nội dung: Started server with target: grpc://localhost:12345
. Công nhân đầu tiên hiện đã sẵn sàng và đang đợi tất cả (các) công nhân khác sẵn sàng để tiếp tục.
Vì vậy, hãy cập nhật tf_config
cho quy trình của worker thứ hai để nhận:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Khởi chạy công nhân thứ hai. Điều này sẽ bắt đầu đào tạo vì tất cả công nhân đều đang hoạt động (vì vậy không cần phải làm nền tảng cho quá trình này):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Nếu bạn kiểm tra lại nhật ký được viết bởi công nhân đầu tiên, bạn sẽ biết rằng nó đã tham gia đào tạo mô hình đó:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
Không có gì đáng ngạc nhiên, điều này chạy chậm hơn so với chạy thử nghiệm ở phần đầu của hướng dẫn này.
Chạy nhiều công nhân trên một máy chỉ làm tăng thêm chi phí.
Mục tiêu ở đây không phải là cải thiện thời gian đào tạo, mà chỉ đưa ra một ví dụ về đào tạo nhiều công nhân.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Đào tạo chuyên sâu cho nhiều nhân viên
Cho đến nay, bạn đã học cách thực hiện thiết lập nhiều nhân viên cơ bản.
Trong phần còn lại của hướng dẫn, bạn sẽ tìm hiểu chi tiết về các yếu tố khác, có thể hữu ích hoặc quan trọng đối với các trường hợp sử dụng thực tế.
Sharding tập dữ liệu
Trong đào tạo nhiều nhân viên, phân tích tập dữ liệu là cần thiết để đảm bảo sự hội tụ và hiệu suất.
Ví dụ trong phần trước dựa trên tự động sạc mặc định được cung cấp bởi API tf.distribute.Strategy
. Bạn có thể kiểm soát sharding bằng cách đặt tf.data.experimental.AutoShardPolicy
của tf.data.experimental.DistributeOptions
.
Để tìm hiểu thêm về tính năng tự động làm sắc nét, hãy tham khảo Hướng dẫn nhập phân tán .
Dưới đây là một ví dụ nhanh về cách tắt chế độ tự động làm sắc nét để mỗi bản sao xử lý mọi ví dụ ( không được khuyến nghị ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
Đánh giá
Nếu bạn chuyển validation_data
vào Model.fit
, nó sẽ luân phiên giữa đào tạo và đánh giá cho mỗi kỷ nguyên. Đánh giá lấy dữ liệu validation_data
được phân phối trên cùng một nhóm công nhân và kết quả đánh giá được tổng hợp và có sẵn cho tất cả công nhân.
Tương tự như đào tạo, tập dữ liệu xác thực được tự động chia nhỏ ở cấp tệp. Bạn cần đặt kích thước lô chung trong tập dữ liệu xác thực và đặt bước validation_steps
.
Một tập dữ liệu lặp lại cũng được khuyến nghị để đánh giá.
Ngoài ra, bạn cũng có thể tạo một tác vụ khác định kỳ đọc các điểm kiểm tra và chạy đánh giá. Đây là những gì Công cụ ước tính làm. Nhưng đây không phải là cách được khuyến nghị để thực hiện đánh giá và do đó các chi tiết của nó bị bỏ qua.
Màn biểu diễn
Bây giờ bạn có một mô hình Keras đã được thiết lập để chạy trong nhiều nhân viên với MultiWorkerMirroredStrategy
.
Để điều chỉnh hiệu suất của đào tạo nhiều nhân viên, bạn có thể thử các cách sau:
tf.distribute.MultiWorkerMirroredStrategy
cung cấp nhiều triển khai giao tiếp tập thể :-
RING
triển khai các tập thể dựa trên vòng sử dụng gRPC làm lớp giao tiếp giữa các máy chủ. -
NCCL
sử dụng Thư viện liên lạc tập thể NVIDIA để thực hiện tập thể. -
AUTO
định nghĩa sự lựa chọn trong thời gian chạy.
Sự lựa chọn tốt nhất của việc triển khai tập thể phụ thuộc vào số lượng GPU, loại GPU và kết nối mạng trong cụm. Để ghi đè lựa chọn tự động, hãy chỉ định tham số
communication_options
của phương thức khởi tạo củaMultiWorkerMirroredStrategy
. Ví dụ:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
Truyền các biến tới
tf.float
nếu có thể:- Mô hình ResNet chính thức bao gồm một ví dụ về cách có thể thực hiện điều này.
Khả năng chịu lỗi
Trong đào tạo đồng bộ, cụm sẽ thất bại nếu một trong các công nhân bị lỗi và không tồn tại cơ chế khắc phục lỗi.
Sử dụng Keras với tf.distribute.Strategy
có lợi thế về khả năng chịu lỗi trong trường hợp công nhân chết hoặc không ổn định. Bạn có thể thực hiện việc này bằng cách duy trì trạng thái đào tạo trong hệ thống tệp phân tán mà bạn chọn, sao cho khi khởi động lại phiên bản mà trước đó không thành công hoặc đã sử dụng trước, trạng thái đào tạo được phục hồi.
Khi một nhân viên không có mặt, các nhân viên khác sẽ không thực hiện được (có thể sau khi hết thời gian chờ). Trong những trường hợp như vậy, cần khởi động lại công nhân không khả dụng, cũng như các công nhân khác đã bị lỗi.
Gọi lại ModelCheckpoint
Cuộc gọi lại ModelCheckpoint
không còn cung cấp chức năng chịu lỗi nữa, hãy sử dụng lệnh gọi lại BackupAndRestore
để thay thế.
Lệnh gọi lại ModelCheckpoint
vẫn có thể được sử dụng để lưu các điểm kiểm tra. Nhưng với điều này, nếu quá trình đào tạo bị gián đoạn hoặc kết thúc thành công, để tiếp tục đào tạo từ điểm kiểm tra, người dùng có trách nhiệm tải mô hình theo cách thủ công.
Theo tùy chọn, người dùng có thể chọn lưu và khôi phục mô hình / trọng số bên ngoài lệnh gọi lại ModelCheckpoint
.
Lưu và tải mô hình
Để lưu mô hình của bạn bằng cách sử dụng model.save
hoặc tf.saved_model.save
, đích lưu cần phải khác nhau đối với mỗi nhân viên.
- Đối với những người không phải là công nhân chính, bạn sẽ cần phải lưu mô hình vào một thư mục tạm thời.
- Đối với trưởng, bạn sẽ cần phải lưu vào thư mục mô hình được cung cấp.
Các thư mục tạm thời trên worker cần phải là duy nhất để tránh lỗi do nhiều worker cố gắng ghi vào cùng một vị trí.
Mô hình được lưu trong tất cả các thư mục là giống hệt nhau và thường chỉ mô hình được lưu bởi trưởng mới được tham chiếu để khôi phục hoặc phục vụ.
Bạn nên có một số logic dọn dẹp để xóa các thư mục tạm thời được tạo bởi công nhân sau khi quá trình đào tạo của bạn đã hoàn thành.
Lý do để tiết kiệm đồng thời cho trưởng và công nhân là vì bạn có thể đang tổng hợp các biến trong quá trình kiểm tra, điều này yêu cầu cả trưởng và công nhân tham gia vào giao thức giao tiếp allreduce. Mặt khác, việc để trưởng và công nhân lưu vào cùng một thư mục mô hình sẽ dẫn đến sai sót do tranh chấp.
Sử dụng MultiWorkerMirroredStrategy
, chương trình được chạy trên mọi worker và để biết liệu worker hiện tại có phải là trưởng hay không, nó sẽ tận dụng đối tượng trình phân giải cụm có các thuộc tính task_type
và task_id
:
-
task_type
cho bạn biết công việc hiện tại là gì (ví dụ:'worker'
). -
task_id
cho bạn biết mã định danh của worker. - Công nhân có
task_id == 0
được chỉ định là công nhân chính.
Trong đoạn mã bên dưới, hàm write_filepath
cung cấp đường dẫn tệp để ghi, đường dẫn này phụ thuộc vào task_id
của worker:
- Đối với công nhân trưởng (với
task_id == 0
), nó ghi vào đường dẫn tệp gốc. - Đối với các worker khác, nó tạo một thư mục tạm
temp_dir
—vớitask_id
trong đường dẫn thư mục để ghi vào:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
Với điều đó, bây giờ bạn đã sẵn sàng để tiết kiệm:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
Như đã mô tả ở trên, về sau, mô hình chỉ nên được tải từ đường dẫn trưởng được lưu vào, vì vậy hãy xóa các mô hình tạm thời mà không phải công nhân trưởng đã lưu:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
Bây giờ, khi đã đến lúc tải, hãy sử dụng API tf.keras.models.load_model
tiện lợi và tiếp tục với công việc tiếp theo.
Ở đây, giả sử chỉ sử dụng một nhân viên duy nhất để tải và tiếp tục đào tạo, trong trường hợp đó, bạn không gọi tf.keras.models.load_model
trong một strategy.scope()
(lưu ý rằng strategy = tf.distribute.MultiWorkerMirroredStrategy()
, như đã định nghĩa trước đó ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
Lưu và khôi phục điểm kiểm tra
Mặt khác, điểm kiểm tra cho phép bạn lưu trọng lượng của mô hình và khôi phục chúng mà không cần phải lưu toàn bộ mô hình.
Tại đây, bạn sẽ tạo một tf.train.Checkpoint
theo dõi mô hình, được quản lý bởi tf.train.CheckpointManager
, để chỉ có điểm kiểm tra mới nhất được giữ nguyên:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Khi CheckpointManager
được thiết lập, bây giờ bạn đã sẵn sàng để lưu và xóa các trạm kiểm soát mà những người không phải là nhân viên chính đã lưu:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
Bây giờ, khi bạn cần khôi phục mô hình, bạn có thể tìm thấy điểm kiểm tra mới nhất được lưu bằng cách sử dụng hàm tf.train.latest_checkpoint
tiện lợi. Sau khi khôi phục điểm kiểm tra, bạn có thể tiếp tục đào tạo.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
Gọi lại BackupAndRestore
Lệnh gọi lại tf.keras.callbacks.BackupAndRestore
cung cấp chức năng chịu lỗi bằng cách sao lưu mô hình và số kỷ nguyên hiện tại trong tệp điểm kiểm tra tạm thời dưới đối số backup_dir
cho BackupAndRestore
. Điều này được thực hiện vào cuối mỗi kỷ nguyên.
Khi công việc bị gián đoạn và khởi động lại, lệnh gọi lại sẽ khôi phục điểm kiểm tra cuối cùng và quá trình đào tạo tiếp tục từ đầu kỷ nguyên bị gián đoạn. Bất kỳ quá trình đào tạo từng phần nào đã được thực hiện trong kỷ nguyên chưa hoàn thành trước khi bị gián đoạn sẽ bị loại bỏ, để nó không ảnh hưởng đến trạng thái mô hình cuối cùng.
Để sử dụng nó, hãy cung cấp một phiên bản của tf.keras.callbacks.BackupAndRestore
tại lệnh gọi Model.fit
.
Với MultiWorkerMirroredStrategy
, nếu một worker bị gián đoạn, toàn bộ cụm sẽ tạm dừng cho đến khi khởi động lại worker bị gián đoạn. Các công nhân khác cũng sẽ khởi động lại và công nhân bị gián đoạn tham gia lại cụm. Sau đó, mọi công nhân đọc tệp điểm kiểm tra đã được lưu trước đó và chọn trạng thái cũ của nó, do đó cho phép cụm đồng bộ trở lại. Sau đó, quá trình đào tạo tiếp tục.
Lệnh gọi lại BackupAndRestore
sử dụng CheckpointManager
để lưu và khôi phục trạng thái đào tạo, tạo một tệp có tên là điểm kiểm tra theo dõi các điểm kiểm tra hiện có cùng với điểm kiểm tra mới nhất. Vì lý do này, backup_dir
không nên được sử dụng lại để lưu trữ các trạm kiểm soát khác nhằm tránh xung đột tên.
Hiện tại, lệnh gọi lại BackupAndRestore
hỗ trợ đào tạo một nhân viên mà không cần chiến lược— MirroredStrategy
—và đào tạo nhiều nhân viên với MultiWorkerMirroredStrategy
.
Dưới đây là hai ví dụ cho cả đào tạo nhiều người lao động và đào tạo một người lao động:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
Nếu bạn kiểm tra thư mục backup_dir
mà bạn đã chỉ định trong BackupAndRestore
, bạn có thể nhận thấy một số tệp điểm kiểm tra được tạo tạm thời. Các tệp đó cần thiết để khôi phục các phiên bản đã mất trước đó và chúng sẽ bị thư viện xóa ở cuối Model.fit
khi thoát thành công khóa đào tạo của bạn.
Các nguồn bổ sung
- Hướng dẫn đào tạo Phân phối trong TensorFlow cung cấp tổng quan về các chiến lược phân phối có sẵn.
- Vòng đào tạo tùy chỉnh với Keras và MultiWorkerMirroredStrategy hướng dẫn cách sử dụng
MultiWorkerMirroredStrategy
với Keras và một vòng đào tạo tùy chỉnh. - Kiểm tra các mô hình chính thức, nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.
- Hướng dẫn Hiệu suất tốt hơn với tf. Chức năng cung cấp thông tin về các chiến lược và công cụ khác, chẳng hạn như TensorFlow Profiler mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.