הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת |
סקירה כללית
מדריך זה מדגים כיצד ניתן להשתמש ב- tf.distribute.Strategy
מבוזרת מרובת עובדים עם tf.estimator
. אם אתה כותב את הקוד שלך באמצעות tf.estimator
, ואתה מעוניין לבצע קנה מידה מעבר למכונה בודדת עם ביצועים גבוהים, המדריך הזה הוא בשבילך.
לפני שמתחילים, אנא קרא את מדריך אסטרטגיית ההפצה . ערכת ההדרכה מרובת GPU רלוונטית גם היא, מכיוון שהדרכה זו משתמשת באותו דגם.
להכין
ראשית, הגדר את TensorFlow ואת הייבוא הדרוש.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
פונקציית קלט
מדריך זה משתמש במערך הנתונים של MNIST מ- TensorFlow Datasets . הקוד כאן דומה להדרכה של ריבוי-GPU עם הבדל מרכזי אחד: בעת שימוש באומדן להדרכה מרובת עובדים, יש צורך לגזור את מערך הנתונים לפי מספר העובדים כדי להבטיח התכנסות מודל. נתוני הקלט מחולקים לפי אינדקס עובד, כך שכל עובד מעבד 1/num_workers
חלקים נפרדים של מערך הנתונים.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
גישה סבירה נוספת להשגת התכנסות תהיה ערבוב מערך הנתונים עם זרעים נפרדים בכל עובד.
תצורה של ריבוי עובדים
אחד ההבדלים העיקריים במדריך זה (בהשוואה להדרכה מרובת GPU ) הוא ההגדרה של ריבוי עובדים. משתנה הסביבה TF_CONFIG
הוא הדרך הסטנדרטית לציין את תצורת האשכול לכל עובד שהוא חלק מהאשכול.
ישנם שני מרכיבים של TF_CONFIG
: cluster
task
. cluster
מספק מידע על האשכול כולו, כלומר העובדים ושרתי הפרמטרים באשכול. task
מספקת מידע על המשימה הנוכחית. cluster
הרכיבים הראשון זהה עבור כל העובדים ושרתי הפרמטרים באשכול, task
הרכיב השני שונה בכל עובד ושרת פרמטרים ומציינת type
index
משלו. בדוגמה זו, type
המשימה הוא worker
index
המשימה הוא 0
.
למטרות המחשה, מדריך זה מראה כיצד להגדיר TF_CONFIG
עם 2 עובדים על localhost
. בפועל, תיצור מספר עובדים על כתובת IP ויציאה חיצונית, ותגדיר TF_CONFIG
על כל עובד בצורה מתאימה, כלומר משנה את index
המשימות .
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
הגדירו את הדגם
כתוב את השכבות, האופטימיזציה ופונקציית ההפסד לאימון. מדריך זה מגדיר את המודל עם שכבות Keras, בדומה להדרכה של ריבוי GPU .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
אסטרטגיית MultiWorkerMirrored
כדי לאמן את המודל, השתמש במופע של tf.distribute.experimental.MultiWorkerMirroredStrategy
. MultiWorkerMirroredStrategy
יוצר עותקים של כל המשתנים בשכבות המודל בכל מכשיר על פני כל העובדים. הוא משתמש ב- CollectiveOps
, אופציה של TensorFlow לתקשורת קולקטיבית, כדי לצבור שיפועים ולשמור את המשתנים מסונכרנים. במדריך tf.distribute.Strategy
יש פרטים נוספים על אסטרטגיה זו.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
אימון והערכת המודל
לאחר מכן, ציין את אסטרטגיית ההפצה ב- RunConfig
עבור האומד, ואמן והעריך על ידי הפעלת tf.estimator.train_and_evaluate
. מדריך זה מפיץ רק את ההדרכה על ידי ציון האסטרטגיה באמצעות train_distribute
. אפשר גם להפיץ את ההערכה דרך eval_distribute
.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
ייעול ביצועי האימון
כעת יש לך דגם ואומדן בעל יכולת ריבוי עובדים המופעל על ידי tf.distribute.Strategy
. אתה יכול לנסות את הטכניקות הבאות כדי לייעל את הביצועים של אימון מרובה עובדים:
- הגדל את גודל האצווה: גודל האצווה המצוין כאן הוא ל-GPU. באופן כללי, גודל האצווה הגדול ביותר שמתאים לזיכרון ה-GPU מומלץ.
- משתני Cast: העבר את המשתנים ל-
tf.float
אם אפשר. המודל הרשמי של ResNet כולל דוגמה כיצד ניתן לעשות זאת. השתמש בתקשורת קולקטיבית:
MultiWorkerMirroredStrategy
מספקת יישומי תקשורת קולקטיביים מרובים.-
RING
מיישמת קולקטיבים מבוססי טבעת המשתמשים ב-gRPC כשכבת התקשורת בין מארחים. -
NCCL
משתמש ב- NCCL של Nvidia כדי ליישם קולקטיבים. -
AUTO
דוחה את הבחירה לזמן הריצה.
הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר וסוג ה-GPUs, ובחיבור הרשת באשכול. כדי לעקוף את הבחירה האוטומטית, ציין ערך חוקי לפרמטר
communication
של הבנאי שלMultiWorkerMirroredStrategy
, למשלcommunication=tf.distribute.experimental.CollectiveCommunication.NCCL
.-
בקר בסעיף ביצועים במדריך כדי ללמוד עוד על אסטרטגיות וכלים אחרים שבהם אתה יכול להשתמש כדי לייעל את הביצועים של דגמי TensorFlow שלך.
דוגמאות קוד אחרות
- דוגמה מקצה לקצה להדרכה מרובת עובדים ב-tensorflow/אקולוגית באמצעות תבניות Kubernetes. דוגמה זו מתחילה במודל Keras וממירה אותו ל-Estimator באמצעות
tf.keras.estimator.model_to_estimator
API. - מודלים רשמיים , שרבים מהם יכולים להיות מוגדרים להפעלת אסטרטגיות הפצה מרובות.