הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת |
סקירה כללית
מדריך זה מדגים הדרכה מרובת עובדים עם API של לולאת הדרכה מותאמת אישית, המופץ באמצעות MultiWorkerMirroredStrategy, כך שמודל Keras שנועד לרוץ על עובד יחיד יכול לעבוד בצורה חלקה על מספר עובדים עם שינוי קוד מינימלי.
אנו משתמשים בלולאות אימון מותאמות אישית כדי לאמן את המודל שלנו מכיוון שהם נותנים לנו גמישות ושליטה רבה יותר באימונים. יתר על כן, קל יותר לנפות באגים במודל ובלולאת האימון. מידע מפורט יותר זמין בכתיבת לולאת אימון מאפס .
אם אתה מחפש כיצד להשתמש ב- MultiWorkerMirroredStrategy
עם keras model.fit
, עיין במדריך זה במקום זאת.
מדריך הדרכה מבוזרת ב-TensorFlow זמין עבור סקירה כללית של אסטרטגיות ההפצה שבהן תומכת TensorFlow למי שמעוניין בהבנה מעמיקה יותר של ממשקי API של tf.distribute.Strategy
.
להכין
ראשית, כמה יבוא נחוץ.
import json
import os
import sys
לפני ייבוא TensorFlow, בצע מספר שינויים בסביבה.
השבת את כל ה-GPUs. זה מונע שגיאות הנגרמות על ידי העובדים שכולם מנסים להשתמש באותו GPU. עבור יישום אמיתי כל עובד יהיה על מכונה אחרת.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
אפס את משתנה הסביבה TF_CONFIG
, תראה יותר על כך מאוחר יותר.
os.environ.pop('TF_CONFIG', None)
ודא שהספרייה הנוכחית נמצאת בנתיב של python. זה מאפשר למחברת לייבא את הקבצים שנכתבו על ידי %%writefile
מאוחר יותר.
if '.' not in sys.path:
sys.path.insert(0, '.')
כעת ייבא את TensorFlow.
import tensorflow as tf
מערך נתונים והגדרת מודל
לאחר מכן צור קובץ mnist.py
עם מודל פשוט והגדרת מערך נתונים. קובץ פיתון זה ישמש את תהליכי העבודה במדריך זה:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
תצורת ריבוי עובדים
עכשיו בואו ניכנס לעולם ההכשרה מרובת עובדים. ב-TensorFlow, משתנה הסביבה TF_CONFIG
נדרש לאימון במספר מכונות, שלכל אחת מהן יש אולי תפקיד אחר. TF_CONFIG
בשימוש להלן, היא מחרוזת JSON המשמשת לציון תצורת האשכול בכל עובד שהוא חלק מהאשכול. זוהי שיטת ברירת המחדל לציון אשכול, באמצעות cluster_resolver.TFConfigClusterResolver
, אך קיימות אפשרויות אחרות הזמינות במודול distribute.cluster_resolver
.
תאר את האשכול שלך
להלן תצורה לדוגמה:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
הנה אותו TF_CONFIG
כמחרוזת JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
ישנם שני מרכיבים של TF_CONFIG
: cluster
task
.
cluster
זהה עבור כל העובדים ומספק מידע על אשכול ההכשרה, שהוא גזרה המורכבת מסוגים שונים של משרות כגוןworker
. בהכשרה מרובת עובדים עםMultiWorkerMirroredStrategy
, יש בדרך כללworker
אחד שלוקח על עצמו קצת יותר אחריות כמו שמירת מחסום וכתיבת קובץ סיכום עבור TensorBoard בנוסף למהworker
רגיל עושה. עובד כזה מכונה העובדchief
, ומקובלworker
עםindex
0 מתמנהworker
הראשי (למעשה כךtf.distribute.Strategy
).task
מספקת מידע על המשימה הנוכחית והיא שונה בכל עובד. הוא מציין אתtype
index
של אותו עובד.
בדוגמה זו, אתה מגדיר את type
המשימה "worker"
ואת index
המשימה ל 0
. מכונה זו היא העובדת הראשונה והיא תתמנה כעובדת הראשית ותעשה יותר עבודה מהאחרות. שים לב שלמכונות אחרות יהיה צורך להגדיר גם את משתנה הסביבה TF_CONFIG
, והוא צריך להיות בעל אותה cluster
, אך type
משימות או index
משימות שונה בהתאם לתפקידים של המכונות הללו.
למטרות המחשה, מדריך זה מראה כיצד ניתן להגדיר TF_CONFIG
עם 2 עובדים על localhost
. בפועל, משתמשים היו יוצרים מספר עובדים על כתובות IP חיצוניות/יציאות, TF_CONFIG
על כל עובד בצורה מתאימה.
בדוגמה זו תשתמש ב-2 עובדים, ה- TF_CONFIG
של העובד הראשון מוצג למעלה. עבור העובד השני היית מגדיר tf_config['task']['index']=1
למעלה, tf_config
הוא רק משתנה מקומי ב-python. כדי להשתמש בו בפועל כדי להגדיר אימון, יש לבצע סדרה של מילון זה בתור JSON, ולהציב אותו במשתנה הסביבה TF_CONFIG
.
משתני סביבה ותתי תהליכים במחברות
תהליכי משנה יורשים משתני סביבה מהאב שלהם. אז אם אתה מגדיר משתנה סביבה בתהליך jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
אתה יכול לגשת למשתנה הסביבה מתת-תהליכים:
echo ${GREETINGS}
Hello TensorFlow!
בסעיף הבא, תשתמש בזה כדי להעביר את ה- TF_CONFIG
לתת-תהליכי העבודה. לעולם לא היית באמת משיק את העבודות שלך בדרך זו, אבל זה מספיק למטרות המדריך הזה: כדי להדגים דוגמה מינימלית של ריבוי עובדים.
אסטרטגיית MultiWorkerMirrored
כדי לאמן את המודל, השתמש במופע של tf.distribute.MultiWorkerMirroredStrategy
, היוצר עותקים של כל המשתנים בשכבות המודל בכל מכשיר על פני כל העובדים. במדריך tf.distribute.Strategy
יש פרטים נוספים על אסטרטגיה זו.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
השתמש tf.distribute.Strategy.scope
כדי לציין שיש להשתמש באסטרטגיה בעת בניית המודל שלך. זה מכניס אותך ל"הקשר צולב העתקים " עבור אסטרטגיה זו, כלומר האסטרטגיה ניתנת לשליטה על דברים כמו מיקום משתנה.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
חלוקה אוטומטית של הנתונים שלך בין עובדים
בהכשרה מרובת עובדים, אין בהכרח צורך בפיצול מערכי נתונים, אולם זה נותן לך סמנטיקה חד פעמית בדיוק, מה שהופך את ההכשרה לניתנת יותר לשחזור, כלומר הכשרה על מספר עובדים צריכה להיות זהה לאימון על עובד אחד. הערה: הביצועים עשויים להיות מושפעים במקרים מסוימים.
ראה: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
הגדר לולאת אימון מותאמת אישית ואימון המודל
ציין אופטימיזציה
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
הגדר שלב אימון באמצעות tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
שמירה ושחזור של מחסומים
הטמעת נקודת ביקורת בלולאת אימון מותאמת אישית מחייבת את המשתמש לטפל בה במקום להשתמש בהתקשרות חוזרת של Keras. זה מאפשר לך לשמור משקלים של הדגם ולשחזר אותם מבלי לשמור את כל הדגם.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
כאן, תיצור tf.train.Checkpoint
אחד שעוקב אחר הדגם, המנוהל על ידי tf.train.CheckpointManager
כך שרק נקודת המחסום העדכנית נשמרת.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
כעת, כאשר אתה צריך לשחזר, אתה יכול למצוא את המחסום האחרון שנשמר באמצעות הפונקציה הנוחה tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
לאחר שחזור המחסום, תוכל להמשיך באימון לולאת האימון המותאמת אישית שלך.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
הגדרת קוד מלאה על עובדים
כדי לפעול בפועל עם MultiWorkerMirroredStrategy
תצטרך להפעיל תהליכי עבודה ולהעביר להם TF_CONFIG
.
כמו הקובץ mnist.py
שנכתב קודם לכן, הנה ה- main.py
שמכיל את אותו הקוד שעברנו עליו שלב אחר שלב קודם לכן בקולאב הזה, אנחנו פשוט כותבים אותו לקובץ כך שכל אחד מהעובדים יריץ אותו:
קובץ: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
לאמן ולהעריך
הספרייה הנוכחית מכילה כעת את שני קבצי Python:
ls *.py
main.py mnist.py
אז ה-json הסדר את ה- TF_CONFIG
והוסף אותו למשתני הסביבה:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
כעת, אתה יכול להפעיל תהליך עבודה שיריץ את ה- main.py
ולהשתמש ב- TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
יש לשים לב לכמה דברים לגבי הפקודה לעיל:
- הוא משתמש ב-
%%bash
שהוא "קסם" של מחברת כדי להפעיל כמה פקודות bash. - הוא משתמש בדגל
--bg
כדי להפעיל את תהליך ה-bash
ברקע, מכיוון שהעובד הזה לא יסתיים. זה מחכה לכל העובדים לפני שהוא מתחיל.
תהליך העבודה ברקע לא ידפיס פלט למחברת זו, אז &>
מפנה את הפלט שלו לקובץ, כדי שתוכל לראות מה קרה.
לכן, המתן מספר שניות עד שהתהליך יתחיל:
import time
time.sleep(20)
כעת תראה מה יצא לקובץ היומן של העובד עד כה:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
השורה האחרונה של קובץ היומן צריכה לומר: Started server with target: grpc://localhost:12345
. העובד הראשון מוכן כעת, ומחכה שכל שאר העובדים יהיו מוכנים להמשיך.
אז עדכן את tf_config
כדי שהתהליך של העובד השני יאסוף:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
כעת הפעל את העובד השני. זה יתחיל את ההכשרה מכיוון שכל העובדים פעילים (לכן אין צורך לרקע תהליך זה):
python main.py > /dev/null 2>&1
כעת, אם תבדוק שוב את היומנים שנכתבו על ידי העובד הראשון, תראה שהוא השתתף בהכשרת המודל הזה:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
הכשרה רב עובדים לעומק
מדריך זה הדגים זרימת עבודה של Custom Training Loop
של ההגדרה מרובת עובדים. תיאור מפורט של נושאים אחרים זמין model.fit's guide
ההגדרה מרובת עובדים וישים ל-CTLs.
ראה גם
- מדריך הדרכה מבוזרת ב- TensorFlow מספק סקירה כללית של אסטרטגיות ההפצה הזמינות.
- מודלים רשמיים , שרבים מהם יכולים להיות מוגדרים להפעלת אסטרטגיות הפצה מרובות.
- קטע הביצועים במדריך מספק מידע על אסטרטגיות וכלים אחרים שבהם אתה יכול להשתמש כדי לייעל את הביצועים של דגמי TensorFlow שלך.