אימון רב עובדים עם קרס

הצג באתר TensorFlow.org הפעל בגוגל קולאב צפה במקור ב-GitHub הורד מחברת

סקירה כללית

מדריך זה מדגים כיצד לבצע הכשרה מבוזרת מרובת עובדים עם מודל Keras ו-API של Model.fit באמצעות ה-API של tf.distribute.Strategy - במיוחד בכיתה tf.distribute.MultiWorkerMirroredStrategy . בעזרת אסטרטגיה זו, מודל Keras שתוכנן לפעול על עובד יחיד יכול לעבוד בצורה חלקה על מספר עובדים עם שינויי קוד מינימליים.

למי שמעוניין בהבנה מעמיקה יותר של ממשקי API של tf.distribute.Strategy , מדריך ההדרכה המבוזר ב-TensorFlow זמין עבור סקירה כללית של אסטרטגיות ההפצה שבהן תומכות TensorFlow.

כדי ללמוד כיצד להשתמש ב- MultiWorkerMirroredStrategy עם Keras ובלולאת אימון מותאמת אישית, עיין בלולאת אימון מותאמת אישית עם Keras ו-MultiWorkerMirroredStrategy .

שימו לב שמטרת המדריך הזה היא להדגים דוגמה מינימלית של ריבוי עובדים עם שני עובדים.

להכין

התחל עם כמה יבוא נחוץ:

import json
import os
import sys

לפני ייבוא ​​TensorFlow, בצע מספר שינויים בסביבה:

  1. השבת את כל ה-GPUs. זה מונע שגיאות הנגרמות על ידי העובדים שכולם מנסים להשתמש באותו GPU. ביישום בעולם האמיתי, כל עובד יהיה על מכונה אחרת.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. אפס את משתנה הסביבה TF_CONFIG (תלמד עוד על כך מאוחר יותר):
os.environ.pop('TF_CONFIG', None)
  1. ודא שהספרייה הנוכחית נמצאת בנתיב של Python - זה מאפשר למחברת לייבא את הקבצים שנכתבו על ידי %%writefile מאוחר יותר:
if '.' not in sys.path:
  sys.path.insert(0, '.')

כעת ייבא את TensorFlow:

import tensorflow as tf

מערך נתונים והגדרת מודל

לאחר מכן, צור קובץ mnist_setup.py עם מודל פשוט והגדרת מערך נתונים. קובץ Python זה ישמש את תהליכי העבודה במדריך זה:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

הדרכת מודל על עובד בודד

נסה לאמן את המודל למספר קטן של תקופות וצפה בתוצאות של עובד בודד כדי לוודא שהכל עובד כמו שצריך. ככל שהאימון מתקדם, ההפסד אמור לרדת והדיוק צריך לעלות.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

תצורה של ריבוי עובדים

עכשיו בואו ניכנס לעולם ההכשרה מרובת עובדים.

אשכול עם משרות ומשימות

ב-TensorFlow, הכשרה מבוזרת כוללת: 'cluster' עם מספר משרות, ולכל אחת מהעבודות עשויה להיות 'task' אחת או יותר.

תזדקק למשתנה סביבת התצורה TF_CONFIG לאימון במספר מכונות, שלכל אחת מהן יש אולי תפקיד אחר. TF_CONFIG היא מחרוזת JSON המשמשת לציון תצורת האשכול עבור כל עובד שהוא חלק מהאשכול.

ישנם שני מרכיבים של משתנה TF_CONFIG : 'cluster' ו-'משימה 'task' .

  • 'cluster' זהה לכל העובדים ומספק מידע על אשכול ההכשרה, שהוא גזרה המורכבת מסוגים שונים של משרות, כגון 'worker' או 'chief' .

    • בהכשרה מרובת עובדים עם tf.distribute.MultiWorkerMirroredStrategy , יש בדרך כלל 'worker' אחד שלוקח על עצמו אחריות, כמו שמירת מחסום וכתיבת קובץ סיכום עבור TensorBoard, בנוסף למה שעושה 'worker' רגיל. 'worker' כזה מכונה העובד הראשי (עם שם העבודה 'chief' ).
    • נהוג של'צ'יף 'chief' יתמנה 'index' 0 (למעשה, כך tf.distribute.Strategy ).
  • 'task' מספקת מידע על המשימה הנוכחית והיא שונה עבור כל עובד. הוא מציין את 'type' 'index' של אותו עובד.

להלן דוגמה לתצורה:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

הנה אותו TF_CONFIG כמחרוזת JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

שימו לב ש- tf_config הוא רק משתנה מקומי ב-Python. כדי להיות מסוגל להשתמש בו עבור תצורת אימון, dict זה צריך להיות מסודר כ-JSON ולהציב אותו במשתנה סביבה TF_CONFIG .

בתצורת הדוגמה שלמעלה, אתה מגדיר את המשימה 'type' 'worker' ואת המשימה 'index' ל 0 . לכן, מכונה זו היא העובדת הראשונה . הוא ימונה כעובד 'chief' ויעשה יותר עבודה מהאחרים.

למטרות המחשה, מדריך זה מראה כיצד תוכל להגדיר משתנה TF_CONFIG עם שני עובדים ב- localhost .

בפועל, תיצור מספר עובדים על כתובות/יציאות IP חיצוניות ותגדיר משתנה TF_CONFIG על כל עובד בהתאם.

במדריך זה, תשתמש בשני עובדים:

  • ה- TF_CONFIG של העובד הראשון ( 'chief' ) מוצג למעלה.
  • עבור העובד השני, תגדיר tf_config['task']['index']=1

משתני סביבה ותתי תהליכים במחברות

תהליכי משנה יורשים משתני סביבה מהאב שלהם.

לדוגמה, אתה יכול להגדיר משתנה סביבה בתהליך זה של Jupyter Notebook באופן הבא:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

לאחר מכן, תוכל לגשת למשתנה הסביבה מתת-תהליכים:

echo ${GREETINGS}
Hello TensorFlow!

בסעיף הבא, תשתמש בשיטה דומה כדי להעביר את ה- TF_CONFIG לתת-תהליכי העבודה. בתרחיש של העולם האמיתי, לא היית משיק את העבודות שלך בדרך זו, אבל זה מספיק בדוגמה זו.

בחר את האסטרטגיה הנכונה

ב-TensorFlow, קיימות שתי צורות עיקריות של הכשרה מבוזרת:

  • אימון סינכרוני , שבו שלבי ההדרכה מסונכרנים בין העובדים והעותקים, וכן
  • אימון אסינכרוני , שבו שלבי האימון אינם מסונכרנים בקפדנות (לדוגמה, אימון שרת פרמטרים ).

מדריך זה מדגים כיצד לבצע הדרכה סינכרונית מרובה עובדים באמצעות מופע של tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy יוצר עותקים של כל המשתנים בשכבות המודל בכל מכשיר על פני כל העובדים. הוא משתמש ב- CollectiveOps , אופציה של TensorFlow לתקשורת קולקטיבית, כדי לצבור שיפועים ולשמור את המשתנים מסונכרנים. במדריך tf.distribute.Strategy יש פרטים נוספים על אסטרטגיה זו.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy מספקת יישומים מרובים באמצעות הפרמטר tf.distribute.experimental.CommunicationOptions : 1) RING מיישמת קולקטיבים מבוססי טבעת המשתמשים ב-gRPC כשכבת התקשורת המארחת; 2) NCCL משתמשת בספריית התקשורת הקולקטיבית של NVIDIA כדי ליישם קולקטיבים; ו-3) AUTO דוחה את הבחירה לזמן הריצה. הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר וסוג ה-GPUs, ובחיבור הרשת באשכול.

אימון הדגם

עם השילוב של tf.distribute.Strategy API לתוך tf.keras , השינוי היחיד שתבצע כדי להפיץ את ההדרכה למספר עובדים הוא לצרף את בניית המודל ואת model.compile() בתוך strategy.scope() . היקף אסטרטגיית ההפצה מכתיב כיצד והיכן נוצרים המשתנים, ובמקרה של MultiWorkerMirroredStrategy , המשתנים שנוצרו הם MirroredVariable s, והם משוכפלים על כל אחד מהעובדים.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

כדי לפעול בפועל עם MultiWorkerMirroredStrategy תצטרך להפעיל תהליכי עבודה ולהעביר להם TF_CONFIG .

כמו הקובץ mnist_setup.py שנכתב קודם לכן, הנה ה- main.py שכל אחד מהעובדים יפעיל:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

בקטע הקוד למעלה, שים לב שה- global_batch_size , שמועבר ל- Dataset.batch , מוגדר ל- per_worker_batch_size * num_workers . זה מבטיח שכל עובד מעבד אצוות של דוגמאות per_worker_batch_size ללא קשר למספר העובדים.

הספרייה הנוכחית מכילה כעת את שני קבצי Python:

ls *.py
main.py
mnist_setup.py

אז ה-json הסדר את ה- TF_CONFIG והוסף אותו למשתני הסביבה:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

כעת, אתה יכול להפעיל תהליך עבודה שיריץ את ה- main.py ולהשתמש ב- TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

יש לשים לב לכמה דברים לגבי הפקודה לעיל:

  1. הוא משתמש ב- %%bash שהוא "קסם" של מחברת כדי להפעיל כמה פקודות bash.
  2. הוא משתמש בדגל --bg כדי להפעיל את תהליך ה- bash ברקע, מכיוון שהעובד הזה לא יסתיים. זה מחכה לכל העובדים לפני שהוא מתחיל.

תהליך העבודה ברקע לא ידפיס פלט למחברת זו, ולכן &> מפנה את הפלט שלו לקובץ כך שתוכל לבדוק מה קרה בקובץ יומן יומן מאוחר יותר.

לכן, המתן מספר שניות עד שהתהליך יתחיל:

import time
time.sleep(10)

כעת, בדוק מה יצא לקובץ היומן של העובד עד כה:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

השורה האחרונה של קובץ היומן צריכה לומר: Started server with target: grpc://localhost:12345 . העובד הראשון מוכן כעת, ומחכה שכל שאר העובדים יהיו מוכנים להמשיך.

אז עדכן את tf_config כדי שהתהליך של העובד השני יאסוף:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

הפעל את העובד השני. זה יתחיל את ההכשרה מכיוון שכל העובדים פעילים (לכן אין צורך לרקע תהליך זה):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

אם תבדוק שוב את היומנים שנכתבו על ידי העובד הראשון, תלמד שהוא השתתף בהכשרת המודל הזה:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

באופן לא מפתיע, זה רץ לאט יותר מאשר הפעלת המבחן בתחילת המדריך הזה.

הפעלת מספר עובדים על מכונה אחת רק מוסיפה תקורה.

המטרה כאן לא הייתה לשפר את זמן האימון, אלא רק לתת דוגמה לאימון רב עובדים.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

הכשרה רב עובדים לעומק

עד כה, למדת כיצד לבצע הגדרה בסיסית של ריבוי עובדים.

במהלך שאר המדריך, תלמד על גורמים אחרים, שעשויים להיות שימושיים או חשובים עבור מקרי שימוש אמיתיים, בפירוט.

פיצול מערכי נתונים

בהכשרה מרובת עובדים, יש צורך בפיצול נתונים כדי להבטיח התכנסות וביצועים.

הדוגמה בסעיף הקודם מסתמכת על ברירת המחדל האוטומטית שסופקה על ידי ה-API של tf.distribute.Strategy . אתה יכול לשלוט בפיצול על ידי הגדרת tf.data.experimental.AutoShardPolicy של tf.data.experimental.DistributeOptions .

למידע נוסף על פיצול אוטומטי , עיין במדריך הקלט המבוזר .

הנה דוגמה מהירה כיצד לכבות את הריסוק האוטומטי, כך שכל העתק יעבד כל דוגמה ( לא מומלץ ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

הַעֲרָכָה

אם תעביר את ה- validation_data לתוך Model.fit , זה יעבור לסירוגין בין אימון להערכה עבור כל תקופה. ההערכה שלוקחת את נתוני validation_data מופצת על פני אותה קבוצה של עובדים ותוצאות ההערכה מצטברות וזמינות עבור כל העובדים.

בדומה לאימון, מערך האימות נגזור אוטומטית ברמת הקובץ. עליך להגדיר גודל אצווה גלובלי במערך הנתונים של האימות ולהגדיר את validation_steps .

מומלץ גם מערך נתונים חוזר להערכה.

לחלופין, ניתן גם ליצור משימה נוספת שקוראת מעת לעת נקודות ביקורת ומריצה את ההערכה. זה מה שאומדן עושה. אבל זו לא דרך מומלצת לביצוע הערכה ולכן פרטיה נשמטים.

ביצועים

כעת יש לך מודל Keras שכולו מוגדר לפעול במספר עובדים עם MultiWorkerMirroredStrategy .

כדי לכוונן את הביצועים של הכשרה מרובה עובדים, אתה יכול לנסות את הפעולות הבאות:

  • tf.distribute.MultiWorkerMirroredStrategy מספקת יישומי תקשורת קולקטיביים מרובים:

    הבחירה הטובה ביותר של הטמעה קולקטיבית תלויה במספר ה-GPUs, סוג ה-GPUs וחיבור הרשת באשכול. כדי לעקוף את הבחירה האוטומטית, ציין את הפרמטר communication_options של הבנאי של MultiWorkerMirroredStrategy . לדוגמה:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • העבר את המשתנים ל- tf.float אם אפשר:

    • המודל הרשמי של ResNet כולל דוגמה כיצד ניתן לעשות זאת.

סובלנות לתקלות

באימון סינכרוני, האשכול ייכשל אם אחד העובדים ייכשל ולא קיים מנגנון שחזור כשלים.

שימוש ב-Keras עם tf.distribute.Strategy מגיע עם היתרון של סובלנות לתקלות במקרים בהם עובדים מתים או אינם יציבים בדרך אחרת. אתה יכול לעשות זאת על ידי שימור מצב האימון במערכת הקבצים המבוזרת לבחירתך, כך שעם הפעלה מחדש של המופע שנכשל בעבר או הקדים, מצב האימון ישוחזר.

כאשר עובד הופך לא זמין, עובדים אחרים ייכשלו (ייתכן לאחר פסק זמן). במקרים כאלה, יש להפעיל מחדש את העובד שאינו זמין, כמו גם עובדים אחרים שנכשלו.

Callback של ModelCheckpoint

התקשרות חוזרת ModelCheckpoint אינה מספקת עוד פונקציונליות של סובלנות תקלות, אנא השתמש במקום זאת ב- BackupAndRestore .

עדיין ניתן להשתמש בהתקשרות חזרה של ModelCheckpoint כדי לשמור נקודות ביקורת. אבל עם זה, אם האימון הופסק או הסתיים בהצלחה, כדי להמשיך את האימון מהמחסום, המשתמש אחראי לטעון את הדגם באופן ידני.

לחלופין, המשתמש יכול לבחור לשמור ולשחזר דגם/משקלים מחוץ להתקשרות חזרה ModelCheckpoint .

שמירת דגם וטעינה

כדי לשמור את הדגם שלך באמצעות model.save או tf.saved_model.save , יעד השמירה צריך להיות שונה עבור כל עובד.

  • עבור עובדים שאינם עובדים ראשיים, תצטרך לשמור את הדגם בספרייה זמנית.
  • עבור המפקד, תצטרך לשמור בספריית הדגמים המסופקת.

הספריות הזמניות של העובד צריכות להיות ייחודיות כדי למנוע שגיאות הנובעות ממספר עובדים שמנסים לכתוב לאותו מיקום.

הדגם שנשמר בכל המדריכים זהה, ובדרך כלל יש להפנות רק לדגם שנשמר על ידי המפקד לצורך שחזור או הגשה.

אמורה להיות לך היגיון ניקוי שמוחק את הספריות הזמניות שנוצרו על ידי העובדים לאחר השלמת ההכשרה שלך.

הסיבה לחיסכון במפקד ובעובדים בו-זמנית היא מכיוון שאתה עשוי לצבור משתנים במהלך נקודת הביקורת, מה שמחייב הן את המפקד והן מהעובדים להשתתף בפרוטוקול התקשורת של allreduce. מצד שני, מתן אפשרות למפקד ולעובדים לשמור באותו ספריית דגמים תגרום לשגיאות עקב מחלוקת.

באמצעות MultiWorkerMirroredStrategy , התוכנית מופעלת על כל עובד, וכדי לדעת אם העובד הנוכחי הוא ראשי, היא מנצלת את אובייקט פותר האשכולות שיש לו תכונות task_type ו- task_id :

  • task_type אומר לך מהי המשרה הנוכחית (למשל 'worker' ).
  • task_id אומר לך את המזהה של העובד.
  • העובד עם task_id == 0 מוגדר כעובד הראשי.

בקטע הקוד למטה, הפונקציה write_filepath מספקת את נתיב הקובץ לכתיבה, שתלוי ב- task_id של העובד:

  • עבור העובד הראשי (עם task_id == 0 ), הוא כותב לנתיב הקובץ המקורי.
  • עבור עובדים אחרים, הוא יוצר ספרייה זמנית - temp_dir - עם ה- task_id בנתיב הספרייה לכתיבה:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

עם זה, אתה מוכן כעת לחסוך:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

כפי שתואר לעיל, בהמשך יש לטעון את המודל רק ממפקד הנתיב שנשמר אליו, אז בואו נסיר את הזמניים שהעובדים הלא ראשיים הצילו:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

עכשיו, כשמגיע הזמן לטעון, בואו נשתמש ב- tf.keras.models.load_model API נוח ונמשיך בעבודה נוספת.

כאן, נניח שמשתמשים רק בעובד בודד כדי לטעון ולהמשיך את ההדרכה, ובמקרה זה אינך קורא tf.keras.models.load_model בתוך strategy.scope() אחר (שים לב ש- strategy = tf.distribute.MultiWorkerMirroredStrategy() , כפי שהוגדר קודם לכן ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

שמירה ושחזור של מחסומים

מצד שני, המחסום מאפשר לשמור את המשקולות של הדגם שלך ולשחזר אותם ללא צורך לשמור את כל הדגם.

כאן תיצור tf.train.Checkpoint אחד שעוקב אחר הדגם, המנוהל על ידי tf.train.CheckpointManager , כך שרק נקודת הבידוק העדכנית נשמרת:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

לאחר הגדרת CheckpointManager , אתה מוכן כעת לשמור ולהסיר את המחסומים שהעובדים הלא-ראשיים שמרו:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

כעת, כאשר אתה צריך לשחזר את הדגם, אתה יכול למצוא את המחסום האחרון שנשמר באמצעות הפונקציה הנוחה tf.train.latest_checkpoint . לאחר שחזור המחסום, ניתן להמשיך בהדרכה.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

BackupAndRestore התקשרות חוזרת

ה-callback tf.keras.callbacks.BackupAndRestore מספק את פונקציונליות סובלנות התקלות על ידי גיבוי הדגם ומספר העידן הנוכחי בקובץ נקודת ביקורת זמני תחת ארגומנט backup_dir ל- BackupAndRestore . זה נעשה בסוף כל תקופה.

ברגע שעבודות מופרעות ומתחילות מחדש, ההתקשרות חזרה משחזרת את המחסום האחרון, והאימונים נמשכים מתחילת העידן שנקטע. כל אימון חלקי שכבר נעשה בעידן הבלתי נגמר לפני ההפרעה ייזרק, כך שהוא לא ישפיע על מצב הדגם הסופי.

כדי להשתמש בו, ספק מופע של tf.keras.callbacks.BackupAndRestore בשיחת Model.fit .

עם MultiWorkerMirroredStrategy , אם עובד מופרע, האשכול כולו עוצר עד שהעובד שהופסק יופעל מחדש. עובדים אחרים יתחילו גם הם, והעובד שהופסק מצטרף מחדש לאשכול. לאחר מכן, כל עובד קורא את קובץ המחסום שנשמר בעבר וקולט את מצבו הקודם, ובכך מאפשר לאשכול לחזור לסנכרון. לאחר מכן, האימונים נמשכים.

ההתקשרות חזרה של BackupAndRestore משתמש ב- CheckpointManager כדי לשמור ולשחזר את מצב האימון, מה שיוצר קובץ שנקרא checkpoint שעוקב אחר נקודות ביקורת קיימות יחד עם הקובץ האחרון. מסיבה זו, אין לעשות שימוש חוזר ב- backup_dir לאחסון נקודות ביקורת אחרות כדי למנוע התנגשות שמות.

נכון לעכשיו, ה- BackupAndRestore תומך בהדרכה של עובד יחיד ללא אסטרטגיה - MirroredStrategy - והדרכה מרובת עובדים עם MultiWorkerMirroredStrategy .

להלן שתי דוגמאות הן להכשרה מרובה עובדים והן להכשרה לעובד יחיד:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

אם תבדוק את הספרייה של backup_dir שציינת ב- BackupAndRestore , ייתכן שתבחין בכמה קבצי נקודת ביקורת שנוצרו באופן זמני. קבצים אלה נחוצים לשחזור המופעים שאבדו בעבר, והם יוסרו על ידי הספרייה בסוף Model.fit עם יציאה מוצלחת מההכשרה שלך.

משאבים נוספים

  1. המדריך המבוזר ב-TensorFlow מספק סקירה כללית של אסטרטגיות ההפצה הזמינות.
  2. לולאת האימון המותאמת אישית עם Keras ו-MultiWorkerMirroredStrategy מראה כיצד להשתמש ב- MultiWorkerMirroredStrategy עם Keras ובלולאת אימון מותאמת אישית.
  3. בדוק את הדגמים הרשמיים , שרבים מהם ניתנים להגדרה להפעלת אסטרטגיות הפצה מרובות.
  4. המדריך Better Performance with tf.function מספק מידע על אסטרטגיות וכלים אחרים, כגון TensorFlow Profiler שאתה יכול להשתמש בו כדי לייעל את הביצועים של דגמי TensorFlow שלך.