הכשרה מרובת עובדים עם Estimator

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

מדריך זה מדגים כיצד ניתן להשתמש ב- tf.distribute.Strategy מבוזרת מרובת עובדים עם tf.estimator . אם אתה כותב את הקוד שלך באמצעות tf.estimator , ואתה מעוניין לבצע קנה מידה מעבר למכונה בודדת עם ביצועים גבוהים, המדריך הזה הוא בשבילך.

לפני שמתחילים, אנא קרא את מדריך אסטרטגיית ההפצה . ערכת ההדרכה מרובת GPU רלוונטית גם היא, מכיוון שהדרכה זו משתמשת באותו דגם.

להכין

ראשית, הגדר את TensorFlow ואת הייבוא ​​הדרוש.

import tensorflow_datasets as tfds
import tensorflow as tf

import os, json
tf.compat.v1.disable_eager_execution()

פונקציית קלט

מדריך זה משתמש במערך הנתונים של MNIST מ- TensorFlow Datasets . הקוד כאן דומה להדרכה של ריבוי-GPU עם הבדל מרכזי אחד: בעת שימוש באומדן להדרכה מרובת עובדים, יש צורך לגזור את מערך הנתונים לפי מספר העובדים כדי להבטיח התכנסות מודל. נתוני הקלט מחולקים לפי אינדקס עובד, כך שכל עובד מעבד 1/num_workers חלקים נפרדים של מערך הנתונים.

BUFFER_SIZE = 10000
BATCH_SIZE = 64

def input_fn(mode, input_context=None):
  datasets, info = tfds.load(name='mnist',
                                with_info=True,
                                as_supervised=True)
  mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
                   datasets['test'])

  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255
    return image, label

  if input_context:
    mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
                                        input_context.input_pipeline_id)
  return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

גישה סבירה נוספת להשגת התכנסות תהיה ערבוב מערך הנתונים עם זרעים נפרדים בכל עובד.

תצורה של ריבוי עובדים

אחד ההבדלים העיקריים במדריך זה (בהשוואה להדרכה מרובת GPU ) הוא ההגדרה של ריבוי עובדים. משתנה הסביבה TF_CONFIG הוא הדרך הסטנדרטית לציין את תצורת האשכול לכל עובד שהוא חלק מהאשכול.

ישנם שני מרכיבים של TF_CONFIG : cluster task . cluster מספק מידע על האשכול כולו, כלומר העובדים ושרתי הפרמטרים באשכול. task מספקת מידע על המשימה הנוכחית. cluster הרכיבים הראשון זהה עבור כל העובדים ושרתי הפרמטרים באשכול, task הרכיב השני שונה בכל עובד ושרת פרמטרים ומציינת type index משלו. בדוגמה זו, type המשימה הוא worker index המשימה הוא 0 .

למטרות המחשה, מדריך זה מראה כיצד להגדיר TF_CONFIG עם 2 עובדים על localhost . בפועל, תיצור מספר עובדים על כתובת IP ויציאה חיצונית, ותגדיר TF_CONFIG על כל עובד בצורה מתאימה, כלומר משנה את index המשימות .

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:12345", "localhost:23456"]
    },
    'task': {'type': 'worker', 'index': 0}
})

הגדירו את הדגם

כתוב את השכבות, האופטימיזציה ופונקציית ההפסד לאימון. מדריך זה מגדיר את המודל עם שכבות Keras, בדומה להדרכה של ריבוי GPU .

LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
  model = tf.keras.Sequential([
      tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
      tf.keras.layers.MaxPooling2D(),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(64, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  logits = model(features, training=False)

  if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {'logits': logits}
    return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)

  optimizer = tf.compat.v1.train.GradientDescentOptimizer(
      learning_rate=LEARNING_RATE)
  loss = tf.keras.losses.SparseCategoricalCrossentropy(
      from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
  loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
  if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(mode, loss=loss)

  return tf.estimator.EstimatorSpec(
      mode=mode,
      loss=loss,
      train_op=optimizer.minimize(
          loss, tf.compat.v1.train.get_or_create_global_step()))

אסטרטגיית MultiWorkerMirrored

כדי לאמן את המודל, השתמש במופע של tf.distribute.experimental.MultiWorkerMirroredStrategy . MultiWorkerMirroredStrategy יוצר עותקים של כל המשתנים בשכבות המודל בכל מכשיר על פני כל העובדים. הוא משתמש ב- CollectiveOps , אופציה של TensorFlow לתקשורת קולקטיבית, כדי לצבור שיפועים ולשמור את המשתנים מסונכרנים. במדריך tf.distribute.Strategy יש פרטים נוספים על אסטרטגיה זו.

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version.
Instructions for updating:
use distribute.MultiWorkerMirroredStrategy instead
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

אימון והערכת המודל

לאחר מכן, ציין את אסטרטגיית ההפצה ב- RunConfig עבור האומד, ואמן והעריך על ידי הפעלת tf.estimator.train_and_evaluate . מדריך זה מפיץ רק את ההדרכה על ידי ציון האסטרטגיה באמצעות train_distribute . אפשר גם להפיץ את ההערכה דרך eval_distribute .

config = tf.estimator.RunConfig(train_distribute=strategy)

classifier = tf.estimator.Estimator(
    model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
    classifier,
    train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
    eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies.
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
use `update_config_proto` instead.
INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy
INFO:tensorflow:Calling model_fn.
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Create CheckpointSaverHook.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version.
Instructions for updating:
Use the iterator's `initializer` property instead.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} }
    .  Registered:  device='CPU'

2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} }
    .  Registered:  device='CPU'
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:loss = 2.292878, step = 0
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:global_step/sec: 173.275
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec)
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:global_step/sec: 189.057
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec)
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:global_step/sec: 193.075
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec)
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:global_step/sec: 199.957
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec)
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:global_step/sec: 204.217
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec)
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:global_step/sec: 201.747
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec)
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:global_step/sec: 206.079
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec)
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:global_step/sec: 231.299
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec)
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:global_step/sec: 657.044
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec)
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938...
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [10/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [20/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [30/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [40/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [50/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [60/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [70/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [80/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [90/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Evaluation [100/100]
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Inference Time : 2.04637s
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938
INFO:tensorflow:Loss for final step: 1.10881.
INFO:tensorflow:Loss for final step: 1.10881.
({'loss': 2.234131, 'global_step': 938}, [])

ייעול ביצועי האימון

כעת יש לך דגם ואומדן בעל יכולת ריבוי עובדים המופעל על ידי tf.distribute.Strategy . אתה יכול לנסות את הטכניקות הבאות כדי לייעל את הביצועים של אימון מרובה עובדים:

  • הגדל את גודל האצווה: גודל האצווה המצוין כאן הוא ל-GPU. באופן כללי, גודל האצווה הגדול ביותר שמתאים לזיכרון ה-GPU מומלץ.
  • משתני Cast: העבר את המשתנים ל- tf.float אם אפשר. המודל הרשמי של ResNet כולל דוגמה כיצד ניתן לעשות זאת.
  • השתמש בתקשורת קולקטיבית: MultiWorkerMirroredStrategy מספקת יישומי תקשורת קולקטיביים מרובים.

    • RING מיישמת קולקטיבים מבוססי טבעת המשתמשים ב-gRPC כשכבת התקשורת בין מארחים.
    • NCCL משתמש ב- NCCL של Nvidia כדי ליישם קולקטיבים.
    • AUTO דוחה את הבחירה לזמן הריצה.

    הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר וסוג ה-GPUs, ובחיבור הרשת באשכול. כדי לעקוף את הבחירה האוטומטית, ציין ערך חוקי לפרמטר communication של הבנאי של MultiWorkerMirroredStrategy , למשל communication=tf.distribute.experimental.CollectiveCommunication.NCCL .

בקר בסעיף ביצועים במדריך כדי ללמוד עוד על אסטרטגיות וכלים אחרים שבהם אתה יכול להשתמש כדי לייעל את הביצועים של דגמי TensorFlow שלך.

דוגמאות קוד אחרות

  1. דוגמה מקצה לקצה להדרכה מרובת עובדים ב-tensorflow/אקולוגית באמצעות תבניות Kubernetes. דוגמה זו מתחילה במודל Keras וממירה אותו ל-Estimator באמצעות tf.keras.estimator.model_to_estimator API.
  2. מודלים רשמיים , שרבים מהם יכולים להיות מוגדרים להפעלת אסטרטגיות הפצה מרובות.