مشاهده در TensorFlow.org | در Google Colab اجرا شود | مشاهده منبع در GitHub | دانلود دفترچه یادداشت |
بررسی اجمالی
این آموزش نحوه اجرای آموزش توزیع شده چندکاره با مدل Keras و Model.fit
API را با استفاده از tf.distribute.Strategy
API - به ویژه کلاس tf.distribute.MultiWorkerMirroredStrategy
می دهد. با کمک این استراتژی، یک مدل Keras که برای اجرا بر روی یک کارگر طراحی شده است، می تواند به طور یکپارچه روی چندین کارگر با حداقل تغییرات کد کار کند.
برای کسانی که علاقه مند به درک عمیق تر از API های tf.distribute.Strategy
، آموزش توزیع شده در راهنمای TensorFlow برای مروری بر استراتژی های توزیع که TensorFlow پشتیبانی می کند در دسترس است.
برای یادگیری نحوه استفاده از MultiWorkerMirroredStrategy
با Keras و یک حلقه آموزشی سفارشی، به حلقه آموزش سفارشی با Keras و MultiWorkerMirroredStrategy مراجعه کنید.
توجه داشته باشید که هدف از این آموزش نشان دادن یک مثال حداقل چند کارگری با دو کارگر است.
برپایی
با برخی واردات ضروری شروع کنید:
import json
import os
import sys
قبل از وارد کردن TensorFlow، چند تغییر در محیط ایجاد کنید:
- تمام پردازنده های گرافیکی را غیر فعال کنید. این از خطاهای ناشی از تلاش کارگران برای استفاده از یک GPU جلوگیری می کند. در یک برنامه دنیای واقعی، هر کارگر روی یک ماشین متفاوت خواهد بود.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- متغیر محیطی
TF_CONFIG
را بازنشانی کنید (در ادامه بیشتر در مورد آن خواهید آموخت):
os.environ.pop('TF_CONFIG', None)
- مطمئن شوید که دایرکتوری فعلی در مسیر پایتون قرار دارد - این به نوت بوک اجازه می دهد تا فایل های نوشته شده توسط
%%writefile
بعدا وارد کند:
if '.' not in sys.path:
sys.path.insert(0, '.')
اکنون TensorFlow را وارد کنید:
import tensorflow as tf
مجموعه داده و تعریف مدل
در مرحله بعد، یک فایل mnist_setup.py
با یک مدل ساده و تنظیمات مجموعه ایجاد کنید. این فایل پایتون توسط فرآیندهای کارگر در این آموزش استفاده خواهد شد:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
آموزش نمونه روی یک کارگر مجرد
سعی کنید مدل را برای تعداد کمی از دوره ها آموزش دهید و نتایج یک کارگر را مشاهده کنید تا مطمئن شوید همه چیز به درستی کار می کند. با پیشرفت تمرین، ضرر باید کاهش یابد و دقت باید افزایش یابد.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
پیکربندی چند کارگری
حالا بیایید وارد دنیای آموزش چندکاره شویم.
خوشه ای با مشاغل و وظایف
در TensorFlow، آموزش توزیع شده شامل: یک 'cluster'
با چندین کار، و هر یک از مشاغل ممکن است یک یا چند 'task'
داشته باشد.
شما به متغیر محیطی پیکربندی TF_CONFIG
برای آموزش روی چندین ماشین نیاز دارید، که احتمالاً هر کدام نقش متفاوتی دارند. TF_CONFIG
یک رشته JSON است که برای تعیین پیکربندی خوشه برای هر کارگری که بخشی از خوشه است استفاده می شود.
دو جزء از یک متغیر TF_CONFIG
وجود دارد: 'cluster'
و 'task'
.
یک
'cluster'
برای همه کارگران یکسان است و اطلاعاتی در مورد خوشه آموزشی ارائه می دهد، که دستوری است از انواع مختلفی از مشاغل، مانند'worker'
یا'chief'
.- در آموزش چندکارهای با
tf.distribute.MultiWorkerMirroredStrategy
، معمولاً یک'worker'
وجود دارد که علاوه بر کاری که یک'worker'
معمولی انجام میدهد، مسئولیتهایی مانند ذخیره یک پست بازرسی و نوشتن یک فایل خلاصه برای TensorBoard را بر عهده میگیرد. چنین'worker'
به عنوان کارگر اصلی (با نام شغل'chief'
) شناخته می شود. - مرسوم است که
'chief'
'index'
0
را به آن منصوب می کند (در واقع،tf.distribute.Strategy
به این ترتیب پیاده سازی می شود).
- در آموزش چندکارهای با
یک
'task'
اطلاعات مربوط به وظیفه فعلی را ارائه می دهد و برای هر کارگر متفاوت است.'type'
و'index'
آن کارگر را مشخص می کند.
در زیر یک نمونه پیکربندی است:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
در اینجا همان TF_CONFIG
است که به عنوان یک رشته JSON سریال شده است:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
توجه داشته باشید که tf_config
فقط یک متغیر محلی در پایتون است. برای اینکه بتوان از آن برای پیکربندی آموزشی استفاده کرد، این دیکت باید به صورت JSON سریال شود و در یک متغیر محیطی TF_CONFIG
قرار گیرد.
در پیکربندی مثال بالا، وظیفه 'type'
را روی 'worker'
و وظیفه 'index'
را روی 0
تنظیم می کنید. بنابراین این دستگاه اولین کارگر است. به عنوان کارگر 'chief'
منصوب می شود و بیشتر از بقیه کار می کند.
برای اهداف تصویری، این آموزش نشان میدهد که چگونه میتوانید یک متغیر TF_CONFIG
را با دو کارگر در یک میزبان localhost
تنظیم کنید.
در عمل، چندین کارگر روی آدرسها/پورتهای IP خارجی ایجاد میکنید و بر اساس آن یک متغیر TF_CONFIG
روی هر کارگر تنظیم میکنید.
در این آموزش شما از دو کارگر استفاده خواهید کرد:
- اولین (
'chief'
)TF_CONFIG
کارگر در بالا نشان داده شده است. - برای کارگر دوم،
tf_config['task']['index']=1
را تنظیم خواهید کرد
متغیرهای محیطی و فرآیندهای فرعی در نوت بوک ها
فرآیندهای فرعی متغیرهای محیطی را از والد خود به ارث می برند.
به عنوان مثال، می توانید یک متغیر محیطی را در این فرآیند نوت بوک Jupyter به صورت زیر تنظیم کنید:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
سپس، می توانید از یک زیر فرآیند به متغیر محیطی دسترسی پیدا کنید:
echo ${GREETINGS}
Hello TensorFlow!
در بخش بعدی، از روش مشابهی برای ارسال TF_CONFIG
به زیرفرایندهای کارگر استفاده خواهید کرد. در یک سناریوی دنیای واقعی، شما مشاغل خود را به این روش راه اندازی نمی کنید، اما در این مثال کافی است.
استراتژی مناسب را انتخاب کنید
در TensorFlow، دو شکل اصلی آموزش توزیع شده وجود دارد:
- آموزش همزمان ، که در آن مراحل آموزش در بین کارگران و کپی ها همگام سازی می شود و
- آموزش ناهمزمان ، که در آن مراحل آموزش به طور دقیق همگام سازی نمی شوند (به عنوان مثال، آموزش سرور پارامتر ).
این آموزش نشان می دهد که چگونه می توان آموزش چند کارگری همزمان را با استفاده از نمونه tf.distribute.MultiWorkerMirroredStrategy
انجام داد.
MultiWorkerMirroredStrategy
کپیهایی از همه متغیرها در لایههای مدل در هر دستگاه در همه کارگران ایجاد میکند. از CollectiveOps
، یک عملیات TensorFlow برای ارتباطات جمعی، برای جمعآوری گرادیانها و همگام نگه داشتن متغیرها استفاده میکند. راهنمای tf.distribute.Strategy
جزئیات بیشتری در مورد این استراتژی دارد.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
چندین پیاده سازی را از طریق پارامتر tf.distribute.experimental.CommunicationOptions
فراهم می کند: 1) RING
مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی متقابل میزبان پیاده سازی می کند. 2) NCCL
از NVIDIA Collective Communication Library برای پیاده سازی جمع ها استفاده می کند. و 3) AUTO
انتخاب را به زمان اجرا موکول می کند. بهترین انتخاب پیاده سازی جمعی به تعداد و نوع GPUها و اتصال شبکه در خوشه بستگی دارد.
مدل را آموزش دهید
با ادغام tf.distribute.Strategy
API در tf.keras
، تنها تغییری که برای توزیع آموزش بین چند کارگر ایجاد خواهید کرد، احاطه ساختن مدل و فراخوانی model.compile model.compile()
در داخل strategi.scope strategy.scope()
است. دامنه استراتژی توزیع تعیین می کند که چگونه و کجا متغیرها ایجاد شوند، و در مورد MultiWorkerMirroredStrategy
، متغیرهای ایجاد شده MirroredVariable
s هستند و بر روی هر یک از کارگران تکرار می شوند.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
برای اجرای واقعی با MultiWorkerMirroredStrategy
، باید فرآیندهای کارگر را اجرا کنید و یک TF_CONFIG
به آنها ارسال کنید.
مانند فایل mnist_setup.py
که قبلا نوشته شده است، در اینجا main.py
است که هر یک از کارگران اجرا خواهند کرد:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
در قطعه کد بالا توجه داشته باشید که global_batch_size
که به Dataset.batch
می شود، روی per_worker_batch_size * num_workers
شده است. این تضمین می کند که هر کارگر دسته هایی از نمونه های per_worker_batch_size
بدون توجه به تعداد کارگران پردازش می کند.
فهرست فعلی شامل هر دو فایل پایتون است:
ls *.py
main.py mnist_setup.py
بنابراین TF_CONFIG
را json سریال کنید و آن را به متغیرهای محیط اضافه کنید:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
اکنون، می توانید یک فرآیند worker را راه اندازی کنید که main.py را اجرا می کند و از main.py
استفاده می TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
در مورد دستور بالا باید به چند نکته توجه کرد:
- از
%%bash
که یک نوت بوک "جادویی" است برای اجرای برخی از دستورات bash استفاده می کند. - از پرچم
--bg
برای اجرای فرآیندbash
در پسزمینه استفاده میکند، زیرا این کارگر خاتمه نمییابد. قبل از شروع کار منتظر همه کارگران است.
فرآیند کارگر پسزمینه خروجی را در این نوتبوک چاپ نمیکند، بنابراین &>
خروجی خود را به یک فایل هدایت میکند تا بتوانید بعداً آنچه را که در یک فایل گزارش اتفاق افتاده است بررسی کنید.
بنابراین، چند ثانیه صبر کنید تا فرآیند شروع شود:
import time
time.sleep(10)
اکنون، آنچه را که تاکنون در فایل گزارش کارگر خروجی داده شده است، بررسی کنید:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
خط آخر فایل لاگ باید بگوید: Started server with target: grpc://localhost:12345
. اولین کارگر اکنون آماده است و منتظر است تا همه کارگران دیگر آماده ادامه کار باشند.
بنابراین tf_config
را برای پردازش دومین کارگر بهروزرسانی کنید:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
دومین کارگر را راه اندازی کنید. این آموزش را آغاز می کند زیرا همه کارگران فعال هستند (بنابراین نیازی به پیشینه این فرآیند نیست):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
اگر گزارشهای نوشته شده توسط اولین کارگر را دوباره بررسی کنید، متوجه خواهید شد که در آموزش آن مدل شرکت کرده است:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
جای تعجب نیست که این کار کندتر از اجرای آزمایشی در ابتدای این آموزش بود.
راه اندازی چندین کارگر روی یک ماشین تنها باعث افزایش هزینه می شود.
هدف در اینجا بهبود زمان آموزش نبود، بلکه تنها ارائه مثالی از آموزش چند کارگری بود.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
آموزش عمیق چند کارگری
تا کنون، نحوه اجرای یک راه اندازی اولیه چندکاره را یاد گرفته اید.
در ادامه آموزش، با سایر عوامل، که ممکن است برای موارد استفاده واقعی مفید یا مهم باشند، به تفصیل آشنا خواهید شد.
اشتراک گذاری مجموعه داده
در آموزش چند کارگری، به اشتراک گذاری داده ها برای اطمینان از همگرایی و عملکرد مورد نیاز است.
مثال در بخش قبل متکی به autosharding پیش فرض ارائه شده توسط tf.distribute.Strategy
API است. میتوانید با تنظیم tf.data.experimental.AutoShardPolicy
tf.data.experimental.DistributeOptions
را کنترل کنید.
برای کسب اطلاعات بیشتر در مورد اشتراک گذاری خودکار ، به راهنمای ورودی توزیع شده مراجعه کنید.
در اینجا یک مثال سریع از نحوه غیرفعال کردن اشتراک گذاری خودکار آورده شده است، به طوری که هر کپی هر نمونه را پردازش می کند ( توصیه نمی شود ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
ارزیابی
اگر validation_data
را به Model.fit
کنید، برای هر دوره بین آموزش و ارزیابی متناوب خواهد شد. validation_data
که از داده های
مشابه آموزش، مجموعه داده اعتبارسنجی به طور خودکار در سطح فایل به اشتراک گذاشته می شود. شما باید یک اندازه دسته کلی را در مجموعه داده اعتبار سنجی تنظیم کنید و validation_steps
را تنظیم کنید.
یک مجموعه داده مکرر نیز برای ارزیابی توصیه می شود.
همچنین، میتوانید کار دیگری ایجاد کنید که به صورت دورهای نقاط بازرسی را میخواند و ارزیابی را اجرا میکند. این کاری است که استیماتور انجام می دهد. اما این روش توصیه ای برای انجام ارزیابی نیست و بنابراین جزئیات آن حذف می شود.
کارایی
اکنون یک مدل Keras دارید که برای اجرا در چندین کارگر با MultiWorkerMirroredStrategy
شده است.
برای تغییر عملکرد آموزش چند کارگری، می توانید موارد زیر را امتحان کنید:
tf.distribute.MultiWorkerMirroredStrategy
چندین پیاده سازی ارتباط جمعی را ارائه می دهد:-
RING
مجموعه های مبتنی بر حلقه را با استفاده از gRPC به عنوان لایه ارتباطی متقابل میزبان پیاده سازی می کند. -
NCCL
از کتابخانه ارتباطات جمعی NVIDIA برای پیاده سازی مجموعه ها استفاده می کند. -
AUTO
انتخاب را به زمان اجرا موکول می کند.
بهترین انتخاب پیاده سازی جمعی به تعداد GPU، نوع GPU و اتصال شبکه در خوشه بستگی دارد. برای لغو انتخاب خودکار، پارامتر
communication_options
MultiWorkerMirroredStrategy
را مشخص کنید. مثلا:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
در صورت امکان متغیرها را به
tf.float
کنید:- مدل رسمی ResNet شامل مثالی از نحوه انجام این کار است.
تحمل خطا
در آموزش همزمان، اگر یکی از کارگران شکست بخورد و مکانیزم بازیابی شکست وجود نداشته باشد، خوشه شکست میخورد.
استفاده از Keras با tf.distribute.Strategy
دارای مزیت تحمل خطا در مواردی است که کارگران می میرند یا ناپایدار هستند. شما می توانید این کار را با حفظ وضعیت آموزشی در سیستم فایل توزیع شده انتخابی خود انجام دهید، به طوری که پس از راه اندازی مجدد نمونه ای که قبلا شکست خورده یا از قبل استفاده شده بود، وضعیت آموزشی بازیابی می شود.
وقتی یک کارگر در دسترس نباشد، سایر کارگران شکست خواهند خورد (احتمالاً پس از یک تایم اوت). در چنین مواردی، کارگر غیرقابل دسترسی و همچنین سایر کارگرانی که شکست خورده اند، نیاز به راه اندازی مجدد دارند.
پاسخ به تماس مدل Checkpoint
پاسخ به تماس ModelCheckpoint
دیگر قابلیت تحمل خطا را ارائه نمی دهد، لطفاً به جای آن از BackupAndRestore
استفاده کنید.
پاسخ تماس ModelCheckpoint
همچنان می تواند برای ذخیره نقاط بازرسی استفاده شود. اما با این کار، اگر آموزش قطع شد یا با موفقیت به پایان رسید، برای ادامه آموزش از نقطه بازرسی، کاربر موظف است مدل را به صورت دستی بارگذاری کند.
به صورت اختیاری، کاربر می تواند ذخیره و بازیابی مدل/وزن ها را خارج از پاسخ به تماس ModelCheckpoint
کند.
ذخیره و بارگذاری مدل
برای ذخیره مدل خود با استفاده از model.save
یا tf.saved_model.save
، مقصد ذخیره باید برای هر کارگر متفاوت باشد.
- برای کارگران غیر ارشد، باید مدل را در یک فهرست موقت ذخیره کنید.
- برای رئیس، باید در پوشه مدل ارائه شده ذخیره کنید.
دایرکتوری های موقت روی کارگر باید منحصر به فرد باشند تا از خطاهای ناشی از تلاش چندین کارگر برای نوشتن در یک مکان جلوگیری شود.
مدل ذخیره شده در همه دایرکتوری ها یکسان است و معمولاً فقط مدل ذخیره شده توسط chief باید برای بازیابی یا ارائه ارجاع داده شود.
شما باید یک منطق پاکسازی داشته باشید که پس از اتمام آموزش، دایرکتوری های موقت ایجاد شده توسط کارگران را حذف می کند.
دلیل صرفه جویی همزمان روی رئیس و کارگران این است که ممکن است متغیرهایی را در حین بازرسی جمع آوری کنید که هم رئیس و هم کارگران باید در پروتکل ارتباطی allreduce شرکت کنند. از سوی دیگر، اجازه دادن به مدیر و کارگران در یک فهرست مدل یکسان باعث بروز خطاهای ناشی از مشاجره می شود.
با استفاده از MultiWorkerMirroredStrategy
، برنامه بر روی هر کارگر اجرا میشود و برای اینکه بداند کارگر فعلی رئیس است یا خیر، از شیء حلکننده کلاستر که دارای ویژگیهای task_type
و task_id
است بهره میبرد:
-
task_type
به شما می گوید که شغل فعلی چیست (به عنوان مثال'worker'
). -
task_id
شناسه کارگر را به شما می گوید. - کارگر با
task_id == 0
به عنوان کارگر اصلی تعیین می شود.
در قطعه کد زیر، تابع write_filepath
مسیر فایل را برای نوشتن ارائه میکند که به task_id
کارگر بستگی دارد:
- برای chief worker (با
task_id == 0
)، در مسیر فایل اصلی می نویسد. - برای سایر کارگران، یک دایرکتوری موقت –
temp_dir
– باtask_id
در مسیر دایرکتوری ایجاد میکند تا در آن بنویسد:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
با آن، اکنون آماده ذخیره کردن هستید:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
همانطور که در بالا توضیح داده شد، بعداً مدل باید فقط از مسیری که در آن ذخیره شده است بارگذاری شود، بنابراین بیایید موارد موقتی را که کارگران غیر ارشد ذخیره کرده اند حذف کنیم:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
اکنون، زمانی که زمان بارگیری فرا می رسد، بیایید از tf.keras.models.load_model
API مناسب استفاده کنیم و به کار بیشتر ادامه دهیم.
در اینجا، فرض کنید که فقط از یک کارگر برای بارگیری و ادامه آموزش استفاده میکنید، در این صورت، tf.keras.models.load_model
در یک strate.scope strategy.scope()
دیگر فراخوانی نمیکنید (توجه داشته باشید که strategy = tf.distribute.MultiWorkerMirroredStrategy()
، همانطور که قبلاً تعریف شد. ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
ذخیره و بازیابی ایست بازرسی
از طرف دیگر، چک پوینت به شما این امکان را می دهد که وزن های مدل خود را ذخیره کرده و بدون نیاز به ذخیره کل مدل، آنها را بازیابی کنید.
در اینجا، یک tf.train.Checkpoint
ایجاد میکنید که مدل را ردیابی میکند، که توسط tf.train.CheckpointManager
مدیریت میشود، به طوری که فقط آخرین نقطه بازرسی حفظ میشود:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
پس از راهاندازی CheckpointManager
، اکنون آماده ذخیره و حذف پستهای بازرسی هستید که کارگران غیر ارشد ذخیره کردهاند:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
اکنون، زمانی که نیاز به بازیابی مدل دارید، میتوانید با استفاده از عملکرد راحت tf.train.latest_checkpoint
، آخرین پست بازرسی ذخیره شده را پیدا کنید. پس از بازگرداندن پست بازرسی، می توانید به آموزش ادامه دهید.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
BackupAndRestore callback
پاسخ تماس tf.keras.callbacks.BackupAndRestore
عملکرد تحمل خطا را با پشتیبان گیری از مدل و شماره دوره فعلی در فایل ایست بازرسی موقت تحت آرگومان backup_dir
در BackupAndRestore
می کند. این کار در پایان هر دوره انجام می شود.
هنگامی که کارها قطع می شوند و مجدداً راه اندازی می شوند، فراخوان آخرین نقطه بازرسی را بازیابی می کند و آموزش از ابتدای دوره قطع شده ادامه می یابد. هر گونه آموزش جزئی که قبلاً در دوره ناتمام قبل از وقفه انجام شده است دور ریخته می شود، به طوری که بر وضعیت مدل نهایی تأثیر نمی گذارد.
برای استفاده از آن، نمونه ای از tf.keras.callbacks.BackupAndRestore
را در تماس Model.fit
.
با MultiWorkerMirroredStrategy
، اگر کارگری دچار وقفه شود، کل خوشه متوقف می شود تا زمانی که کارگر قطع شده مجدداً راه اندازی شود. سایر کارگران نیز راه اندازی مجدد خواهند شد و کارگر قطع شده دوباره به خوشه می پیوندد. سپس، هر کارگر فایل ایست بازرسی را که قبلاً ذخیره شده بود میخواند و حالت قبلی خود را میگیرد، در نتیجه به خوشه اجازه میدهد دوباره همگام شود. سپس، آموزش ادامه می یابد.
BackupAndRestore
callback از CheckpointManager
برای ذخیره و بازیابی وضعیت آموزشی استفاده میکند، که فایلی به نام checkpoint تولید میکند که پستهای بازرسی موجود را به همراه آخرین مورد ردیابی میکند. به همین دلیل، backup_dir
نباید دوباره برای ذخیره سایر نقاط بازرسی استفاده شود تا از تصادم نام جلوگیری شود.
در حال حاضر، BackupAndRestore
از آموزش تک کارگری بدون استراتژی – MirroredStrategy
– و آموزش چندکاره با MultiWorkerMirroredStrategy
می کند.
در زیر دو مثال برای آموزش چند کارگری و آموزش تک کارگری آورده شده است:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
اگر دایرکتوری backup_dir
را که در BackupAndRestore
مشخص کردهاید بررسی کنید، ممکن است متوجه برخی از فایلهای ایست بازرسی موقت شوید. این فایلها برای بازیابی نمونههای از دست رفته قبلی مورد نیاز هستند، و پس از خروج موفقیتآمیز از آموزش، در انتهای Model.fit
توسط کتابخانه حذف خواهند شد.
منابع اضافی
- راهنمای آموزشی Distributed in TensorFlow یک نمای کلی از استراتژی های توزیع موجود را ارائه می دهد.
- حلقه آموزشی Custom with Keras و MultiWorkerMirroredStrategy آموزش نحوه استفاده از
MultiWorkerMirroredStrategy
با Keras و یک حلقه آموزشی سفارشی را نشان می دهد. - مدلهای رسمی را بررسی کنید، که بسیاری از آنها را میتوان برای اجرای چندین استراتژی توزیع پیکربندی کرد.
- راهنمای عملکرد بهتر با tf.function اطلاعاتی درباره استراتژیها و ابزارهای دیگر، مانند TensorFlow Profiler که میتوانید برای بهینهسازی عملکرد مدلهای TensorFlow خود استفاده کنید، ارائه میدهد.