הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת |
סקירה כללית
אימון שרת פרמטרים הוא שיטה נפוצה מקבילה לנתונים להגדלת הדרכת מודלים במספר מכונות.
אשכול הדרכה לשרת פרמטרים מורכב מעובדים ושרתי פרמטרים . משתנים נוצרים בשרתי פרמטרים והם נקראים ומתעדכנים על ידי העובדים בכל שלב. כברירת מחדל, עובדים קוראים ומעדכנים משתנים אלה באופן עצמאי מבלי לסנכרן זה עם זה. זו הסיבה שלפעמים אימון בסגנון שרת פרמטר נקרא אימון אסינכרוני .
ב-TensorFlow 2, הכשרת שרת פרמטרים מופעלת על ידי מחלקה tf.distribute.experimental.ParameterServerStrategy
, אשר מפיצה את שלבי ההדרכה לאשכול המתרחב לאלפי עובדים (בליווי שרתי פרמטרים).
שיטות אימון נתמכות
ישנן שתי שיטות אימון עיקריות נתמכות:
- ה-API של Keras
Model.fit
, שמומלץ כאשר אתם מעדיפים הפשטה וטיפול בהדרכה ברמה גבוהה. - לולאת אימון מותאמת אישית (ניתן להתייחס לאימון מותאם אישית , כתיבת לולאת אימון מאפס ולופ אימון מותאם אישית עם Keras ו-MultiWorkerMirroredStrategy לפרטים נוספים.) אימון לולאה מותאם אישית מומלץ כאשר אתה מעדיף להגדיר את הפרטים של לולאת האימון שלהם.
אשכול עם משרות ומשימות
ללא קשר ל-API הנבחר ( Model.fit
או לולאת הדרכה מותאמת אישית), הכשרה מבוזרת ב-TensorFlow 2 כוללת: 'cluster'
עם מספר 'jobs'
, ולכל אחת מהעבודות עשויות להיות 'tasks'
אחת או יותר.
בעת שימוש באימון שרת פרמטרים, מומלץ להצטייד ב:
- תפקיד רכז אחד (שיש לו את שם התפקיד
chief
) - משרות עובדים מרובות (
worker
שם עבודה); ו - משימות שרת פרמטרים מרובות (שם עבודה
ps
)
בזמן שהרכז יוצר משאבים, שולח משימות הדרכה, כותב מחסומים ומטפל בתקלות משימות, עובדים ושרתי פרמטרים מפעילים את tf.distribute.Server
שמקשיבים לבקשות מהרכז.
אימון שרת פרמטרים עם Model.fit
API
אימון שרת פרמטרים עם ה-API של Model.fit
מחייב את המתאם להשתמש באובייקט tf.distribute.experimental.ParameterServerStrategy
, ו- tf.keras.utils.experimental.DatasetCreator
כקלט. בדומה לשימוש ב- Model.fit
ללא אסטרטגיה, או עם אסטרטגיות אחרות, זרימת העבודה כוללת יצירה והידור של המודל, הכנת החזרות, ולאחר מכן קריאת Model.fit
.
אימון שרת פרמטרים עם לולאת אימון מותאמת אישית
עם לולאות אימון מותאמות אישית, כיתת tf.distribute.experimental.coordinator.ClusterCoordinator
היא מרכיב המפתח המשמש את הרכז.
- הכיתה
ClusterCoordinator
צריכה לעבוד בשילוב עם אובייקטtf.distribute.Strategy
. - אובייקט
tf.distribute.Strategy
זה נחוץ כדי לספק את המידע של האשכול ומשמש להגדרת שלב הדרכה, כפי שהודגם באימון מותאם אישית עם tf.distribute.Strategy . - לאחר מכן, האובייקט
ClusterCoordinator
שולח את ביצוע שלבי ההדרכה הללו לעובדים מרוחקים. - לצורך הכשרת שרת פרמטרים,
ClusterCoordinator
צריך לעבוד עםtf.distribute.experimental.ParameterServerStrategy
.
ה-API החשוב ביותר שסופק על ידי אובייקט ClusterCoordinator
הוא schedule
:
- ה-API של
schedule
מעמיד בתור פונקציה tf. ומחזיר מידtf.function
דמויRemoteValue
. - הפונקציות בתור יישלחו לעובדים מרוחקים בשרשורי רקע וה-
RemoteValue
שלהם יתמלאו באופן אסינכרוני. - מאחר
schedule
אינו מצריך הקצאת עובד, ניתן לבצע אתtf.function
. המועברת על כל עובד זמין. - אם העובד שעליו הוא מבוצע לא יהיה זמין לפני השלמתו, הפונקציה תבוצע שוב על עובד זמין אחר.
- בגלל עובדה זו והעובדה שביצוע פונקציה אינו אטומי, פונקציה עשויה להתבצע יותר מפעם אחת.
בנוסף לשליחת פונקציות מרחוק, ClusterCoordinator
עוזר גם ליצור מערכי נתונים על כל העובדים ולבנות מחדש את מערכי הנתונים הללו כאשר עובד מתאושש מכשל.
הגדרת הדרכה
המדריך יסתעף למסלולי לולאת הדרכה Model.fit
והתאמה אישית, ותוכל לבחור את זה שמתאים לצרכים שלך. סעיפים שאינם "אימון עם X" חלים על שני המסלולים.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
הגדרת אשכול
כפי שצוין לעיל, אשכול אימון שרת פרמטרים דורש משימת מתאם שמפעילה את תוכנית ההדרכה שלך, עובד אחד או כמה משימות שרת פרמטרים המריצים שרתי TensorFlow— tf.distribute.Server
— ואולי משימת הערכה נוספת שמפעילה הערכת רכב צדדי. (ראה סעיף הערכת רכב צד להלן). הדרישות להגדרתם הן:
- משימת הרכז צריכה לדעת את הכתובות והיציאות של כל שרתי TensorFlow האחרים מלבד המעריך.
- העובדים ושרתי הפרמטרים צריכים לדעת לאיזו יציאה הם צריכים להאזין. למען הפשטות, אתה יכול בדרך כלל להעביר את מידע האשכול המלא בעת יצירת שרתי TensorFlow במשימות אלו.
- משימת המעריך לא חייבת לדעת את ההגדרה של אשכול ההדרכה. אם כן, הוא לא צריך לנסות להתחבר לאשכול ההדרכה.
- עובדים ושרתי פרמטרים צריכים להיות בעלי סוגי משימות כמו
"worker"
ו-"ps"
, בהתאמה. על הרכז להשתמש"chief"
כסוג המשימה מסיבות מדור קודם.
במדריך זה, תיצור אשכול בתהליך כך שניתן יהיה להריץ את כל אימון שרת הפרמטרים ב-Colab. תלמד כיצד להגדיר אשכולות אמיתיים בחלק מאוחר יותר.
אשכול בתהליך
תתחיל ביצירת מספר שרתי TensorFlow מראש ותתחבר אליהם מאוחר יותר. שימו לב שזה רק למטרת הדגמה של הדרכה זו, ובאימון אמיתי השרתים יופעלו על מכונות "worker"
ו- "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
הגדרת האשכול בתהליך משמשת לעתים קרובות בבדיקות יחידות, כגון כאן .
אפשרות נוספת לבדיקה מקומית היא להפעיל תהליכים במכונה המקומית - בדוק את הדרכה של Multi-worker עם Keras לקבלת דוגמה לגישה זו.
יצירת ParameterServerStrategy
לפני שאתם צוללים לתוך קוד האימון, בואו ניצור אובייקט של ParameterServerStrategy
. שים לב שזה נחוץ ללא קשר אם אתה ממשיך עם Model.fit
או לולאת אימון מותאמת אישית. הארגומנט variable_partitioner
יוסבר בסעיף ריסוק משתנה .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
על מנת להשתמש ב-GPUs להדרכה, הקצו גרפי GPU גלויים לכל עובד. ParameterServerStrategy
ישתמש בכל ה-GPUs הזמינים בכל עובד, עם ההגבלה שלכל העובדים יהיה אותו מספר של GPUs זמין.
ריסוק משתנה
ריסוק משתנים מתייחס לפיצול משתנה למספר משתנים קטנים יותר, הנקראים רסיסים . פיצול משתנים עשוי להיות שימושי כדי להפיץ את עומס הרשת בעת גישה לרסיסים אלה. זה גם שימושי כדי להפיץ חישוב ואחסון של משתנה רגיל על פני שרתי פרמטרים מרובים.
כדי לאפשר פיצול משתנים, אתה יכול להעביר ב- variable_partitioner
בעת בניית אובייקט ParameterServerStrategy
. ה- variable_partitioner
יופעל בכל פעם כאשר משתנה נוצר והוא צפוי להחזיר את מספר הרסיסים לאורך כל מימד של המשתנה. חלק מ- variable_partitioner
מחוץ לקופסה מסופקים כגון tf.distribute.experimental.partitioners.MinSizePartitioner
. מומלץ להשתמש במחיצות מבוססות גודל כמו tf.distribute.experimental.partitioners.MinSizePartitioner
כדי להימנע מחלוקת משתנים קטנים, שעלולים להשפיע לרעה על מהירות אימון המודל.
כאשר variable_partitioner
מועברים ואם אתה יוצר משתנה ישירות תחת strategy.scope()
, הוא יהפוך לסוג מיכל עם מאפיין variables
המספק גישה לרשימת הרסיסים. ברוב המקרים, מיכל זה יומר אוטומטית לטנזור על ידי שרשור כל הרסיסים. כתוצאה מכך, ניתן להשתמש בו כמשתנה נורמלי. מצד שני, חלק משיטות TensorFlow כמו tf.nn.embedding_lookup
מספקות הטמעה יעילה עבור סוג מיכל זה ובשיטות אלו תימנעה שרשור אוטומטי.
אנא עיין במסמכי ה-API של tf.distribute.experimental.ParameterServerStrategy
לפרטים נוספים.
אימון עם Model.fit
Keras מספקת API קל לשימוש לאימון דרך Model.fit
המטפל בלולאת האימון שמתחת למכסה המנוע, עם הגמישות של train_step
, והתקשרויות חוזרות, המספקות פונקציונליות כמו שמירת נקודות ביקורת או שמירת סיכום עבור TensorBoard. עם Model.fit
, ניתן להשתמש באותו קוד אימון עבור אסטרטגיות אחרות עם החלפה פשוטה של אובייקט האסטרטגיה.
קלט נתונים
Model.fit
עם אימון שרת פרמטרים מחייב את נתוני הקלט להיות מסופקים ב-callable שלוקח ארגומנט בודד מסוג tf.distribute.InputContext
, ומחזיר tf.data.Dataset
. לאחר מכן, צור אובייקט tf.keras.utils.experimental.DatasetCreator
שלוקח אובייקט קריא כזה, tf.distribute.InputOptions
callable
באמצעות ארגומנט input_options
.
שימו לב שמומלץ לערבב ולחזור על הנתונים עם אימון שרת פרמטרים, ולציין steps_per_epoch
ב- fit
call כדי שהספרייה תדע את גבולות העידן.
אנא עיין במדריך הקלט המבוזר למידע נוסף על הארגומנט InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
הקוד ב- dataset_fn
יופעל בהתקן הקלט, שהוא בדרך כלל ה-CPU, בכל אחת ממכונות העבודה.
בניית מודל והידור
כעת, תיצור tf.keras.Model
— מודל tf.keras.models.Sequential
טריוויאלי למטרות הדגמה — ואחריו קריאה Model.compile
לשילוב רכיבים, כגון מייעל, מדדים או פרמטרים כגון steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
התקשרות והדרכה
לפני שאתה מתקשר ל- model.fit
לאימון בפועל, בואו נכין את ההתקשרות הנחוצות למשימות נפוצות, כגון:
-
ModelCheckpoint
: כדי לשמור את משקלי הדגם. -
BackupAndRestore
: כדי לוודא שהתקדמות האימון מגובת אוטומטית, ומשוחזרת אם האשכול חווה חוסר זמינות (כגון הפסקה או מניעת היעדרות); אוֹ -
TensorBoard
: כדי לשמור את דוחות ההתקדמות בקבצי סיכום, המוצגים בכלי TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
שימוש ישיר עם ClusterCoordinator
(אופציונלי)
גם אם תבחר בנתיב ההדרכה Model.fit
, אתה יכול לחלופין ליצור אובייקט tf.distribute.experimental.coordinator.ClusterCoordinator
כדי לתזמן פונקציות אחרות שאתה רוצה שיבוצעו על העובדים. עיין בסעיף אימון עם לולאת אימון מותאמת אישית לפרטים נוספים ודוגמאות.
אימון עם לולאת אימון מותאמת אישית
שימוש בלולאות אימון מותאמות אישית עם tf.distribute.Strategy
מספק גמישות רבה להגדרת לולאות אימון. עם ParameterServerStrategy
שהוגדרה לעיל ( strategy
), תשתמש ב- tf.distribute.experimental.coordinator.ClusterCoordinator
כדי לשלוח את ביצוע שלבי ההדרכה לעובדים מרוחקים.
לאחר מכן, תיצור מודל, תגדיר מערך נתונים ופונקציית צעד, כפי שעשית בלולאת האימון עם s tf.distribute.Strategy
אחרים. תוכל למצוא פרטים נוספים בהדרכה מותאמת אישית עם tf.distribute.Strategy .
כדי להבטיח שליפה מוקדמת של מערך נתונים יעילה, השתמש בממשקי API ליצירת מערך נתונים מבוזר המומלצים המוזכרים בסעיף שלבי הדרכה לשיגור לעובדים מרוחקים להלן. כמו כן, הקפד להתקשר ל- Strategy.run
בתוך worker_fn
כדי לנצל את מלוא ה-GPUs שהוקצו לעובדים. שאר השלבים זהים לאימון עם או בלי GPUs.
בואו ניצור את הרכיבים האלה בשלבים הבאים:
הגדר את הנתונים
ראשית, כתוב פונקציה שיוצרת מערך נתונים הכולל לוגיקת עיבוד מקדים המיושמת על ידי שכבות עיבוד מוקדם של Keras .
אתה תיצור את השכבות האלה מחוץ ל- dataset_fn
אבל תחיל את הטרנספורמציה בתוך ה- dataset_fn
, מכיוון שאתה תעטוף את dataset_fn
לתוך tf.function
, שלא מאפשרת ליצור משתנים בתוכו.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
צור דוגמאות צעצועים במערך נתונים:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
לאחר מכן, צור את מערך ההדרכה עטוף ב- dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
בנה את הדגם
לאחר מכן, צור את המודל ואובייקטים אחרים. הקפד ליצור את כל המשתנים תחת strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
בואו נאשר שהשימוש ב- FixedShardsPartitioner
פיצל את כל המשתנים לשני רסיסים וכל רסיס הוקצה לשרתי פרמטרים שונים:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
הגדר את שלב האימון
שלישית, צור את שלב האימון עטוף ב- tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
בפונקציית שלב האימון לעיל, קריאה ל- Strategy.run
ו- Strategy.reduce
ב- step_fn
יכולה לתמוך במספר GPUs לכל עובד. אם לעובדים הוקצו GPUs, Strategy.run
יפיץ את מערכי הנתונים על מספר העתקים.
שליחת שלבי הדרכה לעובדים מרוחקים
לאחר שכל החישובים יוגדרו על ידי ParameterServerStrategy
, תשתמש בכיתה tf.distribute.experimental.coordinator.ClusterCoordinator
כדי ליצור משאבים ולהפיץ את שלבי ההדרכה לעובדים מרוחקים.
בואו ניצור תחילה אובייקט ClusterCoordinator
ונעביר את אובייקט האסטרטגיה:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
לאחר מכן, צור מערך נתונים לכל עובד ואיטרטור. ב- per_worker_dataset_fn
למטה, מומלץ לעטוף את dataset_fn
לתוך strategy.distribute_datasets_from_function
כדי לאפשר שליפה יעילה מראש למעבדי GPU בצורה חלקה.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
השלב האחרון הוא להפיץ את החישוב לעובדים מרוחקים באמצעות ClusterCoordinator.schedule
:
- שיטת
schedule
מעמידה בתור פונקציה tf. ומחזירה מידtf.function
דמויRemoteValue
. הפונקציות בתור יישלחו לעובדים מרוחקים בשרשורי רקע וה-RemoteValue
יתמלא באופן אסינכרוני. - ניתן להשתמש בשיטת
join
(ClusterCoordinator.join
) כדי להמתין עד לביצוע כל הפונקציות המתוזמנות.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
כך תוכל להביא את התוצאה של RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
לחלופין, אתה יכול להפעיל את כל השלבים ולעשות משהו בזמן ההמתנה להשלמה:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
לקבלת זרימת העבודה המלאה של ההדרכה וההגשה עבור דוגמה מסוימת זו, בדוק את המבחן הזה.
עוד על יצירת מערך נתונים
מערך הנתונים בקוד לעיל נוצר באמצעות ה-API של ClusterCoordinator.create_per_worker_dataset
). הוא יוצר מערך נתונים אחד לכל עובד ומחזיר אובייקט מיכל. אתה יכול לקרוא לשיטת iter
על זה כדי ליצור איטרטור לכל עובד. האיטרטור לכל עובד מכיל איטרטור אחד לכל עובד והחתך המתאים של עובד יוחלף בארגומנט הקלט של הפונקציה המועברת לשיטת ClusterCoordinator.schedule
לפני ביצוע הפונקציה על עובד מסוים.
נכון לעכשיו, שיטת ClusterCoordinator.schedule
מניחה שהעובדים שווים, ולכן מניחה שמערכי הנתונים של עובדים שונים זהים, אלא שהם עשויים להיות ערבובים בצורה שונה אם הם מכילים פעולת Dataset.shuffle
. בשל כך, מומלץ גם לחזור על מערכי הנתונים ללא הגבלת זמן ולתזמן מספר סופי של שלבים במקום להסתמך על OutOfRangeError
ממערך נתונים.
הערה חשובה נוספת היא tf.data
נתונים של tf.data אינם תומכים בסריאליזציה מרומזת וסידריאליזציה על פני גבולות המשימות. לכן חשוב ליצור את כל מערך הנתונים בתוך הפונקציה המועברת ל- ClusterCoordinator.create_per_worker_dataset
.
הַעֲרָכָה
יש יותר מדרך אחת להגדיר ולהפעיל לולאת הערכה בהדרכה מבוזרת. לכל אחד יש יתרונות וחסרונות משלו כפי שמתואר להלן. שיטת ההערכה המוטבעת מומלצת אם אין לך העדפה.
הערכה מוטבעת
בשיטה זו, הרכז מחליף בין אימון להערכה וכך היא נקראת הערכה מוטבעת .
ישנם מספר יתרונות של הערכה מוטבעת. לדוגמה:
- זה יכול לתמוך במודלים גדולים של הערכה ובמערכי נתונים של הערכה שמשימה אחת לא יכולה להחזיק.
- ניתן להשתמש בתוצאות ההערכה כדי לקבל החלטות לאימון העידן הבא.
ישנן שתי דרכים ליישם הערכה מוטבעת: הערכה ישירה והערכה מבוזרת.
- הערכה ישירה : עבור מודלים קטנים ומערכי הערכה, הרכז יכול להפעיל הערכה ישירות על המודל המבוזר עם מערך ההערכה על הרכז:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- הערכה מבוזרת : עבור מודלים או מערכי נתונים גדולים שלא ניתן להפעיל ישירות על הרכז, משימת הרכז יכולה להפיץ משימות הערכה לעובדים באמצעות שיטות
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
הערכת רכב צד
שיטה נוספת נקראת הערכת מכוניות צד, שבה אתה יוצר משימת מעריך ייעודית שקוראת שוב ושוב מחסומים ומפעילה הערכה על מחסום אחרון. זה מאפשר לתוכנית האימונים שלך להסתיים מוקדם אם אינך צריך לשנות את לולאת האימון שלך בהתבסס על תוצאות הערכה. עם זאת, זה דורש משימת מעריך נוספת ובדיקה תקופתית כדי להפעיל הערכה. להלן לולאת הערכה אפשרית של רכב צד:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
אשכולות בעולם האמיתי
בסביבת ייצור אמיתית, תפעיל את כל המשימות בתהליכים שונים על מכונות שונות. הדרך הפשוטה ביותר להגדיר מידע על אשכולות בכל משימה היא להגדיר משתני סביבה "TF_CONFIG"
ולהשתמש ב- tf.distribute.cluster_resolver.TFConfigClusterResolver
כדי לנתח את "TF_CONFIG"
.
לתיאור כללי על משתני סביבה "TF_CONFIG"
, עיין במדריך ההדרכה המבוזר .
אם אתה מתחיל את משימות ההדרכה שלך באמצעות Kubernetes או תבניות תצורה אחרות, סביר מאוד להניח שתבניות אלו כבר הגדירו “TF_CONFIG"
.
הגדר את משתנה הסביבה "TF_CONFIG"
נניח שיש לך 3 עובדים ו-2 שרתי פרמטרים, ה- "TF_CONFIG"
של עובד 1 יכול להיות:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
ה- "TF_CONFIG"
של המעריך יכול להיות:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
החלק "cluster"
במחרוזת "TF_CONFIG"
לעיל עבור המעריך הוא אופציונלי.
אם אתה משתמש באותו בינארי עבור כל המשימות
אם אתה מעדיף להפעיל את כל המשימות הללו באמצעות קובץ בינארי אחד, תצטרך לתת לתוכנית שלך להסתעף לתפקידים שונים כבר בהתחלה:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
הקוד הבא מפעיל שרת TensorFlow וממתין:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
טיפול בכשל במשימה
כישלון עובד
tf.distribute.experimental.coordinator.ClusterCoordinator
או Model.fit
מספקים סובלנות מובנית לתקלות עבור כשל עובד. לאחר שחזור עובד, פונקציית הנתונים שסופקו בעבר (או ל- ClusterCoordinator.create_per_worker_dataset
עבור לולאת אימון מותאמת אישית, או tf.keras.utils.experimental.DatasetCreator
for Model.fit
) תופעל על העובדים כדי ליצור מחדש את מערכי הנתונים.
כשל בשרת פרמטר או ברכז
עם זאת, כאשר המתאם יראה שגיאת שרת פרמטר, הוא יעלה מיידית UnavailableError
או AbortedError
. אתה יכול להפעיל מחדש את הרכז במקרה זה. גם הרכז עצמו עלול להיות בלתי זמין. לכן, מומלץ להשתמש בכלים מסוימים כדי לא לאבד את התקדמות האימון:
עבור
Model.fit
, עליך להשתמש בהתקשרות חוזרת שלBackupAndRestore
, המטפלת בשמירת ההתקדמות ובשחזור אוטומטית. עיין בסעיף התקשרויות והדרכה למעלה לדוגמא.עבור לולאת אימון מותאמת אישית, עליך לבדוק את משתני המודל מעת לעת ולטעון משתני מודל מנקודת ביקורת, אם ישנה, לפני תחילת האימון. ניתן להסיק את התקדמות האימון בקירוב מ-
optimizer.iterations
אם יש נקודת ביקורת לאופטימיזציה:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
שליפת ערך RemoteValue
RemoteValue
מובטחת שתצליח אם פונקציה מבוצעת בהצלחה. הסיבה לכך היא שכרגע ערך ההחזרה מועתק מיד לרכז לאחר ביצוע פונקציה. אם יש כשל של עובד במהלך ההעתקה, הפונקציה תתנסה שוב בעובד זמין אחר. לכן, אם ברצונך לבצע אופטימיזציה לביצועים, תוכל לתזמן פונקציות ללא ערך החזרה.
דיווח שגיאות
ברגע שהמתאם יראה שגיאה כגון UnavailableError
משרתי פרמטרים או שגיאות יישום אחרות כגון InvalidArgument
מ- tf.debugging.check_numerics
, הוא יבטל את כל הפונקציות הממתינות והתורן לפני העלאת השגיאה. RemoteValue
התואמים שלהם תעלה שגיאת CancelledError
.
לאחר העלאת שגיאה, הרכז לא יעלה את אותה שגיאה או כל שגיאה מפונקציות שבוטלו.
שיפור ביצועים
ישנן מספר סיבות אפשריות אם אתה רואה בעיות בביצועים כאשר אתה מתאמן עם ParameterServerStrategy
ו- ClusterResolver
.
אחת הסיבות הנפוצות היא שלשרתי פרמטרים יש עומס לא מאוזן וכמה שרתי פרמטרים בעלי עומסים כבדים הגיעו לקיבולת. יכולות להיות גם סיבות שורש מרובות. כמה שיטות פשוטות למתן בעיה זו הן:
- חלק את משתני המודל הגדולים שלך באמצעות ציון משתני_מחיצת
variable_partitioner
בעת בנייתParameterServerStrategy
. - הימנע מיצירת משתנה נקודה חמה שנדרש על ידי כל שרתי הפרמטרים בשלב אחד, אם אפשר. לדוגמה, השתמש בקצב למידה קבוע או תת-מחלקה
tf.keras.optimizers.schedules.LearningRateSchedule
באופטימייזרים שכן התנהגות ברירת המחדל היא שקצב הלמידה יהפוך למשתנה המוצב בשרת פרמטר מסוים ומתבקש על ידי כל שאר שרתי הפרמטרים בכל שלב . - ערבב את אוצר המילים הגדולים שלך לפני שתעביר אותם לשכבות העיבוד המקדים של Keras.
סיבה אפשרית נוספת לבעיות ביצועים היא הרכז. ההטמעה הראשונה שלך של schedule
/ join
היא מבוססת Python ולכן עשויה להיות תקורה של שרשור. כמו כן, ההשהיה בין הרכז לעובדים יכולה להיות גדולה. אם זה המקרה,
עבור
Model.fit
, אתה יכול להגדיר ארגומנטsteps_per_execution
המסופק ב-Model.compile
לערך גדול מ-1.עבור לולאת אימון מותאמת אישית, אתה יכול לארוז שלבים מרובים לתוך
tf.function
יחיד:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
ככל שהספרייה עוברת אופטימיזציה נוספת, אני מקווה שרוב המשתמשים לא יצטרכו לארוז ידנית שלבים בעתיד.
בנוסף, טריק קטן לשיפור ביצועים הוא לתזמן פונקציות ללא ערך החזרה כפי שהוסבר בסעיף כשל במשימה שלמעלה.
מגבלות ידועות
רוב המגבלות הידועות כבר מכוסות בסעיפים לעיל. סעיף זה מספק תקציר.
ParameterServerStrategy
כללי
-
os.environment["grpc_fail_fast"]="use_caller"
בכל משימה, כולל הרכז, כדי לגרום לסובלנות לתקלות לפעול כהלכה. - אין תמיכה באימון שרת פרמטר סינכרוני.
- בדרך כלל יש צורך לארוז מספר שלבים לפונקציה אחת כדי להשיג ביצועים מיטביים.
- אין תמיכה בטעינת saved_model דרך
tf.saved_model.load
המכיל משתנים מפוצלים. שים לב שטעינת saved_model כזה באמצעות TensorFlow Serving צפויה לעבוד. - אין תמיכה בטעינת נקודת ביקורת המכילה משתני חריצי אופטימיזציה מפוצלים למספר שונה של רסיסים.
- אין תמיכה בהתאוששות מכשל בשרת פרמטרים מבלי להפעיל מחדש את משימת הרכז.
- השימוש ב-
tf.lookup.StaticHashTable
(המועסק בדרך כלל על ידי כמה שכבות עיבוד מקדים של Keras, כגוןtf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
ו-tf.keras.layers.TextVectorization
) מביא למשאבים המוצבים ב- הרכז בשלב זה עם הכשרת שרת פרמטרים. יש לכך השלכות ביצועים על חיפוש RPCs מהעובדים לרכז. זוהי עדיפות גבוהה כיום לטיפול.
פרטי Model.fit
- נדרש ארגומנט
steps_per_epoch
ב-Model.fit
. אתה יכול לבחור ערך המספק מרווחים מתאימים בתקופה. - ל-
ParameterServerStrategy
אין תמיכה בהתקשרויות חוזרות מותאמות אישית שיש להן קריאות ברמת אצווה מטעמי ביצועים. אתה צריך להמיר את הקריאות האלה לקריאות ברמת עידן עםsteps_per_epoch
שנבחרו כהלכה, כך שהם נקראים בכל מספרsteps_per_epoch
-תקופה של צעדים. שיחות חוזרות מובנות אינן מושפעות: השיחות ברמת האצווה שלהם שונו לביצועים. תמיכה בקריאות ברמת אצווה עבורParameterServerStrategy
מתוכננת. - מאותה סיבה, בניגוד לאסטרטגיות אחרות, סרגל ההתקדמות והמדדים מתועדים רק בגבולות עידן.
-
run_eagerly
אינו נתמך.
פרטים על לולאת אימון מותאמת אישית
-
ClusterCoordinator.schedule
אינו תומך בהבטחות ביקור עבור מערך נתונים. - כאשר נעשה שימוש
ClusterCoordinator.create_per_worker_dataset
, יש ליצור את כל מערך הנתונים בתוך הפונקציה המועברת אליו. - המערכת מתעלמת
tf.data.Options
במערך נתונים שנוצר על ידיClusterCoordinator.create_per_worker_dataset
.