סקירה כללית
מדריך זה מדגים כיצד לבצע הכשרה מבוזרת מרובת עובדים עם מודל Keras ו-API של Model.fit
באמצעות ה-API של tf.distribute.Strategy
- במיוחד בכיתה tf.distribute.MultiWorkerMirroredStrategy
. בעזרת אסטרטגיה זו, מודל Keras שתוכנן לפעול על עובד יחיד יכול לעבוד בצורה חלקה על מספר עובדים עם שינויי קוד מינימליים.
למי שמעוניין בהבנה מעמיקה יותר של ממשקי API של tf.distribute.Strategy
, מדריך ההדרכה המבוזר ב-TensorFlow זמין עבור סקירה כללית של אסטרטגיות ההפצה שבהן תומכות TensorFlow.
כדי ללמוד כיצד להשתמש ב- MultiWorkerMirroredStrategy
עם Keras ובלולאת אימון מותאמת אישית, עיין בלולאת אימון מותאמת אישית עם Keras ו-MultiWorkerMirroredStrategy .
שימו לב שמטרת המדריך הזה היא להדגים דוגמה מינימלית של ריבוי עובדים עם שני עובדים.
להכין
התחל עם כמה יבוא נחוץ:
import json
import os
import sys
לפני ייבוא TensorFlow, בצע מספר שינויים בסביבה:
- השבת את כל ה-GPUs. זה מונע שגיאות הנגרמות על ידי העובדים שכולם מנסים להשתמש באותו GPU. ביישום בעולם האמיתי, כל עובד יהיה על מכונה אחרת.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- אפס את משתנה הסביבה
TF_CONFIG
(תלמד עוד על כך מאוחר יותר):
os.environ.pop('TF_CONFIG', None)
- ודא שהספרייה הנוכחית נמצאת בנתיב של Python - זה מאפשר למחברת לייבא את הקבצים שנכתבו על ידי
%%writefile
מאוחר יותר:
if '.' not in sys.path:
sys.path.insert(0, '.')
כעת ייבא את TensorFlow:
import tensorflow as tf
מערך נתונים והגדרת מודל
לאחר מכן, צור קובץ mnist_setup.py
עם מודל פשוט והגדרת מערך נתונים. קובץ Python זה ישמש את תהליכי העבודה במדריך זה:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
הדרכת מודל על עובד בודד
נסה לאמן את המודל למספר קטן של תקופות וצפה בתוצאות של עובד בודד כדי לוודא שהכל עובד כמו שצריך. ככל שהאימון מתקדם, ההפסד אמור לרדת והדיוק צריך לעלות.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
תצורה של ריבוי עובדים
עכשיו בואו ניכנס לעולם ההכשרה מרובת עובדים.
אשכול עם משרות ומשימות
ב-TensorFlow, הכשרה מבוזרת כוללת: 'cluster'
עם מספר משרות, ולכל אחת מהעבודות עשויה להיות 'task'
אחת או יותר.
תזדקק למשתנה סביבת התצורה TF_CONFIG
לאימון במספר מכונות, שלכל אחת מהן יש אולי תפקיד אחר. TF_CONFIG
היא מחרוזת JSON המשמשת לציון תצורת האשכול עבור כל עובד שהוא חלק מהאשכול.
ישנם שני מרכיבים של משתנה TF_CONFIG
: 'cluster'
ו-'משימה 'task'
.
'cluster'
זהה לכל העובדים ומספק מידע על אשכול ההכשרה, שהוא גזרה המורכבת מסוגים שונים של משרות, כגון'worker'
או'chief'
.- בהכשרה מרובת עובדים עם
tf.distribute.MultiWorkerMirroredStrategy
, יש בדרך כלל'worker'
אחד שלוקח על עצמו אחריות, כמו שמירת מחסום וכתיבת קובץ סיכום עבור TensorBoard, בנוסף למה שעושה'worker'
רגיל.'worker'
כזה מכונה העובד הראשי (עם שם העבודה'chief'
). - נהוג של'צ'יף
'chief'
יתמנה'index'
0
(למעשה, כךtf.distribute.Strategy
).
- בהכשרה מרובת עובדים עם
'task'
מספקת מידע על המשימה הנוכחית והיא שונה עבור כל עובד. הוא מציין את'type'
'index'
של אותו עובד.
להלן דוגמה לתצורה:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
הנה אותו TF_CONFIG
כמחרוזת JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
שימו לב ש- tf_config
הוא רק משתנה מקומי ב-Python. כדי להיות מסוגל להשתמש בו עבור תצורת אימון, dict זה צריך להיות מסודר כ-JSON ולהציב אותו במשתנה סביבה TF_CONFIG
.
בתצורת הדוגמה שלמעלה, אתה מגדיר את המשימה 'type'
'worker'
ואת המשימה 'index'
ל 0
. לכן, מכונה זו היא העובדת הראשונה . הוא ימונה כעובד 'chief'
ויעשה יותר עבודה מהאחרים.
למטרות המחשה, מדריך זה מראה כיצד תוכל להגדיר משתנה TF_CONFIG
עם שני עובדים ב- localhost
.
בפועל, תיצור מספר עובדים על כתובות/יציאות IP חיצוניות ותגדיר משתנה TF_CONFIG
על כל עובד בהתאם.
במדריך זה, תשתמש בשני עובדים:
- ה-
TF_CONFIG
של העובד הראשון ('chief'
) מוצג למעלה. - עבור העובד השני, תגדיר
tf_config['task']['index']=1
משתני סביבה ותתי תהליכים במחברות
תהליכי משנה יורשים משתני סביבה מהאב שלהם.
לדוגמה, אתה יכול להגדיר משתנה סביבה בתהליך זה של Jupyter Notebook באופן הבא:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
לאחר מכן, תוכל לגשת למשתנה הסביבה מתת-תהליכים:
echo ${GREETINGS}
Hello TensorFlow!
בסעיף הבא, תשתמש בשיטה דומה כדי להעביר את ה- TF_CONFIG
לתת-תהליכי העבודה. בתרחיש של העולם האמיתי, לא היית משיק את העבודות שלך בדרך זו, אבל זה מספיק בדוגמה זו.
בחר את האסטרטגיה הנכונה
ב-TensorFlow, קיימות שתי צורות עיקריות של הכשרה מבוזרת:
- אימון סינכרוני , שבו שלבי ההדרכה מסונכרנים בין העובדים והעותקים, וכן
- אימון אסינכרוני , שבו שלבי האימון אינם מסונכרנים בקפדנות (לדוגמה, אימון שרת פרמטרים ).
מדריך זה מדגים כיצד לבצע הדרכה סינכרונית מרובה עובדים באמצעות מופע של tf.distribute.MultiWorkerMirroredStrategy
.
MultiWorkerMirroredStrategy
יוצר עותקים של כל המשתנים בשכבות המודל בכל מכשיר על פני כל העובדים. הוא משתמש ב- CollectiveOps
, אופציה של TensorFlow לתקשורת קולקטיבית, כדי לצבור שיפועים ולשמור את המשתנים מסונכרנים. במדריך tf.distribute.Strategy
יש פרטים נוספים על אסטרטגיה זו.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
מספקת יישומים מרובים באמצעות הפרמטר tf.distribute.experimental.CommunicationOptions
: 1) RING
מיישמת קולקטיבים מבוססי טבעת המשתמשים ב-gRPC כשכבת התקשורת המארחת; 2) NCCL
משתמשת בספריית התקשורת הקולקטיבית של NVIDIA כדי ליישם קולקטיבים; ו-3) AUTO
דוחה את הבחירה לזמן הריצה. הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר וסוג ה-GPUs, ובחיבור הרשת באשכול.
אימון הדגם
עם השילוב של tf.distribute.Strategy
API לתוך tf.keras
, השינוי היחיד שתבצע כדי להפיץ את ההדרכה למספר עובדים הוא לצרף את בניית המודל ואת model.compile()
בתוך strategy.scope()
. היקף אסטרטגיית ההפצה מכתיב כיצד והיכן נוצרים המשתנים, ובמקרה של MultiWorkerMirroredStrategy
, המשתנים שנוצרו הם MirroredVariable
s, והם משוכפלים על כל אחד מהעובדים.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
כדי לפעול בפועל עם MultiWorkerMirroredStrategy
תצטרך להפעיל תהליכי עבודה ולהעביר להם TF_CONFIG
.
כמו הקובץ mnist_setup.py
שנכתב קודם לכן, הנה ה- main.py
שכל אחד מהעובדים יפעיל:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
בקטע הקוד למעלה, שים לב שה- global_batch_size
, שמועבר ל- Dataset.batch
, מוגדר ל- per_worker_batch_size * num_workers
. זה מבטיח שכל עובד מעבד אצוות של דוגמאות per_worker_batch_size
ללא קשר למספר העובדים.
הספרייה הנוכחית מכילה כעת את שני קבצי Python:
ls *.py
main.py mnist_setup.py
אז ה-json הסדר את ה- TF_CONFIG
והוסף אותו למשתני הסביבה:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
כעת, אתה יכול להפעיל תהליך עבודה שיריץ את ה- main.py
ולהשתמש ב- TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
יש לשים לב לכמה דברים לגבי הפקודה לעיל:
- הוא משתמש ב-
%%bash
שהוא "קסם" של מחברת כדי להפעיל כמה פקודות bash. - הוא משתמש בדגל
--bg
כדי להפעיל את תהליך ה-bash
ברקע, מכיוון שהעובד הזה לא יסתיים. זה מחכה לכל העובדים לפני שהוא מתחיל.
תהליך העבודה ברקע לא ידפיס פלט למחברת זו, ולכן &>
מפנה את הפלט שלו לקובץ כך שתוכל לבדוק מה קרה בקובץ יומן יומן מאוחר יותר.
לכן, המתן מספר שניות עד שהתהליך יתחיל:
import time
time.sleep(10)
כעת, בדוק מה יצא לקובץ היומן של העובד עד כה:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
השורה האחרונה של קובץ היומן צריכה לומר: Started server with target: grpc://localhost:12345
. העובד הראשון מוכן כעת, ומחכה שכל שאר העובדים יהיו מוכנים להמשיך.
אז עדכן את tf_config
כדי שהתהליך של העובד השני יאסוף:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
הפעל את העובד השני. זה יתחיל את ההכשרה מכיוון שכל העובדים פעילים (לכן אין צורך לרקע תהליך זה):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
אם תבדוק שוב את היומנים שנכתבו על ידי העובד הראשון, תלמד שהוא השתתף בהכשרת המודל הזה:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
באופן לא מפתיע, זה רץ לאט יותר מאשר הפעלת המבחן בתחילת המדריך הזה.
הפעלת מספר עובדים על מכונה אחת רק מוסיפה תקורה.
המטרה כאן לא הייתה לשפר את זמן האימון, אלא רק לתת דוגמה לאימון רב עובדים.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
הכשרה רב עובדים לעומק
עד כה, למדת כיצד לבצע הגדרה בסיסית של ריבוי עובדים.
במהלך שאר המדריך, תלמד על גורמים אחרים, שעשויים להיות שימושיים או חשובים עבור מקרי שימוש אמיתיים, בפירוט.
פיצול מערכי נתונים
בהכשרה מרובת עובדים, יש צורך בפיצול נתונים כדי להבטיח התכנסות וביצועים.
הדוגמה בסעיף הקודם מסתמכת על ברירת המחדל האוטומטית שסופקה על ידי ה-API של tf.distribute.Strategy
. אתה יכול לשלוט בפיצול על ידי הגדרת tf.data.experimental.AutoShardPolicy
של tf.data.experimental.DistributeOptions
.
למידע נוסף על פיצול אוטומטי , עיין במדריך הקלט המבוזר .
הנה דוגמה מהירה כיצד לכבות את הריסוק האוטומטי, כך שכל העתק יעבד כל דוגמה ( לא מומלץ ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
הַעֲרָכָה
אם תעביר את ה- validation_data
לתוך Model.fit
, זה יעבור לסירוגין בין אימון להערכה עבור כל תקופה. ההערכה שלוקחת את נתוני validation_data
מופצת על פני אותה קבוצה של עובדים ותוצאות ההערכה מצטברות וזמינות עבור כל העובדים.
בדומה לאימון, מערך האימות נגזור אוטומטית ברמת הקובץ. עליך להגדיר גודל אצווה גלובלי במערך הנתונים של האימות ולהגדיר את validation_steps
.
מומלץ גם מערך נתונים חוזר להערכה.
לחלופין, ניתן גם ליצור משימה נוספת שקוראת מעת לעת נקודות ביקורת ומריצה את ההערכה. זה מה שאומדן עושה. אבל זו לא דרך מומלצת לביצוע הערכה ולכן פרטיה נשמטים.
ביצועים
כעת יש לך מודל Keras שכולו מוגדר לפעול במספר עובדים עם MultiWorkerMirroredStrategy
.
כדי לכוונן את הביצועים של הכשרה מרובה עובדים, אתה יכול לנסות את הפעולות הבאות:
tf.distribute.MultiWorkerMirroredStrategy
מספקת יישומי תקשורת קולקטיביים מרובים:-
RING
מיישמת קולקטיבים מבוססי טבעת המשתמשים ב-gRPC כשכבת התקשורת בין מארחים. -
NCCL
משתמש בספריית התקשורת הקולקטיבית של NVIDIA כדי ליישם קולקטיבים. -
AUTO
דוחה את הבחירה לזמן הריצה.
הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר ה-GPUs, סוג ה-GPUs וחיבור הרשת באשכול. כדי לעקוף את הבחירה האוטומטית, ציין את הפרמטר
communication_options
של הבנאי שלMultiWorkerMirroredStrategy
. לדוגמה:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
העבר את המשתנים ל-
tf.float
אם אפשר:- המודל הרשמי של ResNet כולל דוגמה כיצד ניתן לעשות זאת.
סובלנות לתקלות
באימון סינכרוני, האשכול ייכשל אם אחד העובדים ייכשל ולא קיים מנגנון שחזור כשלים.
שימוש ב-Keras עם tf.distribute.Strategy
מגיע עם היתרון של סובלנות לתקלות במקרים בהם עובדים מתים או אינם יציבים בדרך אחרת. אתה יכול לעשות זאת על ידי שימור מצב האימון במערכת הקבצים המבוזרת לבחירתך, כך שעם הפעלה מחדש של המופע שנכשל בעבר או הקדים, מצב האימון ישוחזר.
כאשר עובד הופך לא זמין, עובדים אחרים ייכשלו (ייתכן לאחר פסק זמן). במקרים כאלה, יש להפעיל מחדש את העובד שאינו זמין, כמו גם עובדים אחרים שנכשלו.
Callback של ModelCheckpoint
התקשרות חוזרת ModelCheckpoint
אינה מספקת עוד פונקציונליות של סובלנות תקלות, אנא השתמש במקום זאת ב- BackupAndRestore
.
עדיין ניתן להשתמש בהתקשרות חזרה של ModelCheckpoint
כדי לשמור נקודות ביקורת. אבל עם זה, אם האימון הופסק או הסתיים בהצלחה, כדי להמשיך את האימון מהמחסום, המשתמש אחראי לטעון את הדגם באופן ידני.
לחלופין, המשתמש יכול לבחור לשמור ולשחזר דגם/משקלים מחוץ להתקשרות חזרה ModelCheckpoint
.
שמירת דגם וטעינה
כדי לשמור את הדגם שלך באמצעות model.save
או tf.saved_model.save
, יעד השמירה צריך להיות שונה עבור כל עובד.
- עבור עובדים שאינם עובדים ראשיים, תצטרך לשמור את הדגם בספרייה זמנית.
- עבור המפקד, תצטרך לשמור בספריית הדגמים המסופקת.
הספריות הזמניות של העובד צריכות להיות ייחודיות כדי למנוע שגיאות הנובעות ממספר עובדים שמנסים לכתוב לאותו מיקום.
הדגם שנשמר בכל המדריכים זהה, ובדרך כלל יש להפנות רק לדגם שנשמר על ידי המפקד לצורך שחזור או הגשה.
אמורה להיות לך היגיון ניקוי שמוחק את הספריות הזמניות שנוצרו על ידי העובדים לאחר השלמת ההכשרה שלך.
הסיבה לחיסכון במפקד ובעובדים בו-זמנית היא מכיוון שאתה עשוי לצבור משתנים במהלך נקודת הביקורת, מה שמחייב הן את המפקד והן מהעובדים להשתתף בפרוטוקול התקשורת של allreduce. מצד שני, מתן אפשרות למפקד ולעובדים לשמור באותו ספריית דגמים תגרום לשגיאות עקב מחלוקת.
באמצעות MultiWorkerMirroredStrategy
, התוכנית מופעלת על כל עובד, וכדי לדעת אם העובד הנוכחי הוא ראשי, היא מנצלת את אובייקט פותר האשכולות שיש לו תכונות task_type
ו- task_id
:
-
task_type
אומר לך מהי המשרה הנוכחית (למשל'worker'
). -
task_id
אומר לך את המזהה של העובד. - העובד עם
task_id == 0
מוגדר כעובד הראשי.
בקטע הקוד למטה, הפונקציה write_filepath
מספקת את נתיב הקובץ לכתיבה, שתלוי ב- task_id
של העובד:
- עבור העובד הראשי (עם
task_id == 0
), הוא כותב לנתיב הקובץ המקורי. - עבור עובדים אחרים, הוא יוצר ספרייה זמנית -
temp_dir
- עם ה-task_id
בנתיב הספרייה לכתיבה:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
עם זה, אתה מוכן כעת לחסוך:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
כפי שתואר לעיל, בהמשך יש לטעון את המודל רק ממפקד הנתיב שנשמר אליו, אז בואו נסיר את הזמניים שהעובדים הלא ראשיים הצילו:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
עכשיו, כשמגיע הזמן לטעון, בואו נשתמש ב- tf.keras.models.load_model
API נוח ונמשיך בעבודה נוספת.
כאן, נניח שמשתמשים רק בעובד בודד כדי לטעון ולהמשיך את ההדרכה, ובמקרה זה אינך קורא tf.keras.models.load_model
בתוך strategy.scope()
אחר (שים לב ש- strategy = tf.distribute.MultiWorkerMirroredStrategy()
, כפי שהוגדר קודם לכן ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
שמירה ושחזור של מחסומים
מצד שני, המחסום מאפשר לשמור את המשקולות של הדגם שלך ולשחזר אותם ללא צורך לשמור את כל הדגם.
כאן תיצור tf.train.Checkpoint
אחד שעוקב אחר הדגם, המנוהל על ידי tf.train.CheckpointManager
, כך שרק נקודת הבידוק העדכנית נשמרת:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
לאחר הגדרת CheckpointManager
, אתה מוכן כעת לשמור ולהסיר את המחסומים שהעובדים הלא-ראשיים שמרו:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
כעת, כאשר אתה צריך לשחזר את הדגם, אתה יכול למצוא את המחסום האחרון שנשמר באמצעות הפונקציה הנוחה tf.train.latest_checkpoint
. לאחר שחזור המחסום, ניתן להמשיך בהדרכה.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
BackupAndRestore התקשרות חוזרת
ה-callback tf.keras.callbacks.BackupAndRestore
מספק את פונקציונליות סובלנות התקלות על ידי גיבוי הדגם ומספר העידן הנוכחי בקובץ נקודת ביקורת זמני תחת ארגומנט backup_dir
ל- BackupAndRestore
. זה נעשה בסוף כל תקופה.
ברגע שעבודות מופרעות ומתחילות מחדש, ההתקשרות חזרה משחזרת את המחסום האחרון, והאימונים נמשכים מתחילת העידן שנקטע. כל אימון חלקי שכבר נעשה בעידן הבלתי נגמר לפני ההפרעה ייזרק, כך שהוא לא ישפיע על מצב הדגם הסופי.
כדי להשתמש בו, ספק מופע של tf.keras.callbacks.BackupAndRestore
בשיחת Model.fit
.
עם MultiWorkerMirroredStrategy
, אם עובד מופרע, האשכול כולו עוצר עד שהעובד שהופסק יופעל מחדש. עובדים אחרים יתחילו גם הם, והעובד שהופסק מצטרף מחדש לאשכול. לאחר מכן, כל עובד קורא את קובץ המחסום שנשמר בעבר וקולט את מצבו הקודם, ובכך מאפשר לאשכול לחזור לסנכרון. לאחר מכן, האימונים נמשכים.
ההתקשרות חזרה של BackupAndRestore
משתמש ב- CheckpointManager
כדי לשמור ולשחזר את מצב האימון, מה שיוצר קובץ שנקרא checkpoint שעוקב אחר נקודות ביקורת קיימות יחד עם הקובץ האחרון. מסיבה זו, אין לעשות שימוש חוזר ב- backup_dir
לאחסון נקודות ביקורת אחרות כדי למנוע התנגשות שמות.
נכון לעכשיו, ה- BackupAndRestore
תומך בהדרכה של עובד יחיד ללא אסטרטגיה - MirroredStrategy
- והדרכה מרובת עובדים עם MultiWorkerMirroredStrategy
.
להלן שתי דוגמאות הן להכשרה מרובה עובדים והן להכשרה לעובד יחיד:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
אם תבדוק את הספרייה של backup_dir
שציינת ב- BackupAndRestore
, ייתכן שתבחין בכמה קבצי נקודת ביקורת שנוצרו באופן זמני. קבצים אלה נחוצים לשחזור המופעים שאבדו בעבר, והם יוסרו על ידי הספרייה בסוף Model.fit
עם יציאה מוצלחת מההכשרה שלך.
משאבים נוספים
- המדריך המבוזר ב-TensorFlow מספק סקירה כללית של אסטרטגיות ההפצה הזמינות.
- לולאת האימון המותאמת אישית עם Keras ו-MultiWorkerMirroredStrategy מראה כיצד להשתמש ב-
MultiWorkerMirroredStrategy
עם Keras ובלולאת אימון מותאמת אישית. - בדוק את הדגמים הרשמיים , שרבים מהם ניתנים להגדרה להפעלת אסטרטגיות הפצה מרובות.
- המדריך Better Performance with tf.function מספק מידע על אסטרטגיות וכלים אחרים, כגון TensorFlow Profiler שאתה יכול להשתמש בו כדי לייעל את הביצועים של דגמי TensorFlow שלך.