הצג באתר TensorFlow.org | הפעל בגוגל קולאב | צפה במקור ב-GitHub | הורד מחברת |
ממשקי ה-API של tf.distribute מספקים דרך קלה למשתמשים לשנות את ההכשרה שלהם ממכונה בודדת למספר מכונות. כאשר מרחיבים את המודל שלהם, המשתמשים צריכים גם להפיץ את הקלט שלהם על פני מספר מכשירים. tf.distribute
מספק ממשקי API שבאמצעותם תוכל להפיץ את הקלט שלך באופן אוטומטי בין מכשירים.
מדריך זה יראה לך את הדרכים השונות שבהן תוכל ליצור מערך נתונים ואיטרטורים מבוזרים באמצעות ממשקי API tf.distribute
. בנוסף, הנושאים הבאים יידונו:
- אפשרויות שימוש, פיצול ואצווה בעת שימוש ב-
tf.distribute.Strategy.experimental_distribute_dataset
ו-tf.distribute.Strategy.distribute_datasets_from_function
. - דרכים שונות שבהן אתה יכול לחזור על מערך הנתונים המבוזר.
- הבדלים בין ממשקי API של
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
וממשקי API שלtf.data
וכן כל מגבלה שמשתמשים עשויים להיתקל בהם בשימוש שלהם.
מדריך זה אינו מכסה שימוש בקלט מבוזר עם ממשקי API של Keras.
מערכי נתונים מבוזרים
כדי להשתמש בממשקי API של tf.distribute
מידה, מומלץ למשתמשים להשתמש ב- tf.data.Dataset
כדי לייצג את הקלט שלהם. tf.distribute
נעשתה לעבוד ביעילות עם tf.data.Dataset
(לדוגמה, שליפה אוטומטית מראש של נתונים לכל מכשיר מאיץ) עם אופטימיזציות של ביצועים שמשולבות באופן קבוע בהטמעה. אם יש לך מקרה שימוש לשימוש במשהו אחר מלבד tf.data.Dataset
, עיין בסעיף מאוחר יותר במדריך זה. בלולאת אימון לא מבוזרת, משתמשים יוצרים תחילה מופע tf.data.Dataset
ולאחר מכן חוזרים על האלמנטים. לדוגמה:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
כדי לאפשר למשתמשים להשתמש באסטרטגיית tf.distribute
עם שינויים מינימליים בקוד הקיים של המשתמש, הוצגו שני ממשקי API אשר יפיצו מופע tf.data.Dataset
ויחזירו אובייקט מערך נתונים מבוזר. לאחר מכן, משתמש יוכל לחזור על מופע הנתונים המבוזר הזה ולאמן את המודל שלו כמו קודם. הבה נסתכל כעת על שני ממשקי ה-API - tf.distribute.Strategy.experimental_distribute_dataset
ו- tf.distribute.Strategy.distribute_datasets_from_function
בפירוט רב יותר:
tf.distribute.Strategy.experimental_distribute_dataset
נוֹהָג
ממשק API זה לוקח מופע tf.data.Dataset
כקלט ומחזיר מופע tf.distribute.DistributedDataset
. עליך לאסוף את מערך הנתונים של הקלט עם ערך השווה לגודל האצווה הגלובלי. גודל אצווה גלובלי זה הוא מספר הדגימות שברצונך לעבד בכל המכשירים בשלב אחד. אתה יכול לחזור על מערך הנתונים המבוזר הזה בצורה פייתונית או ליצור איטרטור באמצעות iter
. האובייקט המוחזר אינו מופע tf.data.Dataset
ואינו תומך באף ממשק API אחר אשר ממיר או בודק את מערך הנתונים בשום אופן. זהו ה-API המומלץ אם אין לך דרכים ספציפיות שבהן ברצונך לחלק את הקלט שלך על פני העתקים שונים.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
נכסים
אצווה
tf.distribute
משחזרת את מופע הקלט tf.data.Dataset
עם גודל אצווה חדש השווה לגודל האצווה העולמי חלקי מספר העתקים המסונכרנים. מספר ההעתקים המסונכרנים שווה למספר המכשירים שלוקחים חלק בשיפוע כל הפחתה במהלך האימון. כאשר משתמש קורא ל- next
באיטרטור המבוזר, גודל אצווה לכל עותק מוחזר על כל עותק. הקרדינליות של מערך הנתונים המחודשים תהיה תמיד כפולה של מספר ההעתקים. הנה כמה דוגמאות:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- ללא הפצה:
- אצווה 1: [0, 1, 2, 3]
- אצווה 2: [4, 5]
עם הפצה מעל 2 העתקים. האצווה האחרונה ([4, 5]) מחולקת בין 2 העתקים.
אצווה 1:
- העתק 1:[0, 1]
- העתק 2:[2, 3]
אצווה 2:
- העתק 2: [4]
- העתק 2: [5]
tf.data.Dataset.range(4).batch(4)
- ללא הפצה:
- אצווה 1: [[0], [1], [2], [3]]
- עם הפצה מעל 5 העתקים:
- אצווה 1:
- העתק 1: [0]
- העתק 2: [1]
- העתק 3: [2]
- העתק 4: [3]
- העתק 5: []
tf.data.Dataset.range(8).batch(4)
- ללא הפצה:
- אצווה 1: [0, 1, 2, 3]
- אצווה 2: [4, 5, 6, 7]
- עם הפצה על פני 3 העתקים:
- אצווה 1:
- העתק 1: [0, 1]
- העתק 2: [2, 3]
- העתק 3: []
- אצווה 2:
- העתק 1: [4, 5]
- העתק 2: [6, 7]
- העתק 3: []
לחלוקה חוזרת של מערך הנתונים יש מורכבות שטח שגדלה באופן ליניארי עם מספר ההעתקים. משמעות הדבר היא שבמקרה השימוש בהכשרה מרובת עובדים, צינור הקלט יכול להיתקל בשגיאות OOM.
ריסוק
tf.distribute
גם משבצת אוטומטית את מערך הנתונים של הקלט בהדרכה מרובת עובדים עם MultiWorkerMirroredStrategy
ו- TPUStrategy
. כל מערך נתונים נוצר במכשיר ה-CPU של העובד. פיצול אוטומטי של מערך נתונים על קבוצה של עובדים פירושה שלכל עובד מוקצית תת-קבוצה של מערך הנתונים כולו (אם מוגדרת ה- tf.data.experimental.AutoShardPolicy
הנכון). זאת כדי להבטיח שבכל שלב, גודל אצווה גלובלי של רכיבי נתונים שאינם חופפים יעובד על ידי כל עובד. לריסוק אוטומטי יש כמה אפשרויות שונות שניתן לציין באמצעות tf.data.experimental.DistributeOptions
. שימו לב שאין פיצול אוטומטי בהכשרה מרובה עובדים עם ParameterServerStrategy
, ומידע נוסף על יצירת מערכי נתונים עם אסטרטגיה זו ניתן למצוא במדריך אסטרטגיית שרת פרמטרים .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
ישנן שלוש אפשרויות שונות שתוכל להגדיר עבור tf.data.experimental.AutoShardPolicy
:
- AUTO: זוהי אפשרות ברירת המחדל, כלומר יתבצע ניסיון לרסיס על ידי FILE. הניסיון לרסיס על ידי FILE נכשל אם לא זוהה מערך נתונים מבוסס קבצים. לאחר מכן
tf.distribute
יחזור לריסוק על ידי DATA. שים לב שאם מערך הנתונים של הקלט מבוסס על קבצים אך מספר הקבצים קטן ממספר העובדים,InvalidArgumentError
. אם זה קורה, הגדר במפורש את המדיניות ל-AutoShardPolicy.DATA
, או פצל את מקור הקלט שלך לקבצים קטנים יותר כך שמספר הקבצים גדול ממספר העובדים. קובץ: זו האפשרות אם ברצונך לגזור את קבצי הקלט על כל העובדים. עליך להשתמש באפשרות זו אם מספר קבצי הקלט גדול בהרבה ממספר העובדים והנתונים בקבצים מחולקים באופן שווה. החיסרון של אפשרות זו הוא שיש עובדים סרק אם הנתונים בקבצים אינם מופצים באופן שווה. אם מספר הקבצים קטן ממספר העובדים,
InvalidArgumentError
. אם זה קורה, הגדר במפורש את המדיניות ל-AutoShardPolicy.DATA
. לדוגמה, תן לנו להפיץ 2 קבצים על פני 2 עובדים עם עותק אחד כל אחד. קובץ 1 מכיל [0, 1, 2, 3, 4, 5] וקובץ 2 מכיל [6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2 וגודל האצווה הגלובלי להיות 4.- עובד 0:
- אצווה 1 = העתק 1: [0, 1]
- אצווה 2 = העתק 1: [2, 3]
- אצווה 3 = העתק 1: [4]
- אצווה 4 = העתק 1: [5]
- עובד 1:
- אצווה 1 = העתק 2: [6, 7]
- אצווה 2 = העתק 2: [8, 9]
- אצווה 3 = העתק 2: [10]
- אצווה 4 = העתק 2: [11]
נתונים: זה יגזור אוטומטית את האלמנטים על פני כל העובדים. כל אחד מהעובדים יקרא את כל מערך הנתונים ויעבד רק את הרסיס שהוקצה לו. כל שאר הרסיסים יושלכו. זה משמש בדרך כלל אם מספר קבצי הקלט קטן ממספר העובדים ואתה רוצה פיצול טוב יותר של נתונים בין כל העובדים. החיסרון הוא שכל מערך הנתונים ייקרא על כל עובד. לדוגמה, תן לנו להפיץ קובץ אחד על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2.
- עובד 0:
- אצווה 1 = העתק 1: [0, 1]
- אצווה 2 = העתק 1: [4, 5]
- אצווה 3 = העתק 1: [8, 9]
- עובד 1:
- אצווה 1 = העתק 2: [2, 3]
- אצווה 2 = העתק 2: [6, 7]
- אצווה 3 = העתק 2: [10, 11]
כבוי: אם תבטל את הפיצול האוטומטי, כל עובד יעבד את כל הנתונים. לדוגמה, תן לנו להפיץ קובץ אחד על פני 2 עובדים. קובץ 1 מכיל [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. תן למספר הכולל של העתקים המסונכרנים להיות 2. אז כל עובד יראה את ההתפלגות הבאה:
- עובד 0:
- אצווה 1 = העתק 1: [0, 1]
- אצווה 2 = העתק 1: [2, 3]
- אצווה 3 = העתק 1: [4, 5]
- אצווה 4 = העתק 1: [6, 7]
- אצווה 5 = העתק 1: [8, 9]
אצווה 6 = העתק 1: [10, 11]
עובד 1:
אצווה 1 = העתק 2: [0, 1]
אצווה 2 = העתק 2: [2, 3]
אצווה 3 = העתק 2: [4, 5]
אצווה 4 = העתק 2: [6, 7]
אצווה 5 = העתק 2: [8, 9]
אצווה 6 = העתק 2: [10, 11]
אחזור מראש
כברירת מחדל, tf.distribute
מוסיף טרנספורמציה של אחזור מראש בסוף מופע tf.data.Dataset
שסופק על ידי המשתמש. הארגומנט לטרנספורמציה של אחזור מראש שהוא buffer_size
שווה למספר העתקים המסונכרנים.
tf.distribute.Strategy.distribute_datasets_from_function
נוֹהָג
API זה לוקח פונקציית קלט ומחזיר מופע tf.distribute.DistributedDataset
. לפונקציית הקלט שמשתמשים מעבירים יש ארגומנט tf.distribute.InputContext
והיא אמורה להחזיר מופע tf.data.Dataset
. עם API זה, tf.distribute
אינו מבצע שינויים נוספים במופע tf.data.Dataset
של המשתמש המוחזר מפונקציית הקלט. באחריות המשתמש לקבץ ולפצל את מערך הנתונים. tf.distribute
קורא לפונקציית הקלט בהתקן המעבד של כל אחד מהעובדים. מלבד לאפשר למשתמשים לציין את היגיון האצווה והפיצול שלהם, ממשק API זה גם מדגים מדרגיות וביצועים טובים יותר בהשוואה ל- tf.distribute.Strategy.experimental_distribute_dataset
כאשר הוא משמש להכשרה מרובה עובדים.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
נכסים
אצווה
המופע tf.data.Dataset
שהוא ערך ההחזרה של פונקציית הקלט צריך להיות באצווה באמצעות גודל אצווה לכל עותק. גודל האצווה לכל עותק הוא גודל האצווה העולמי חלקי מספר העתקים שלוקחים חלק באימון הסנכרון. הסיבה לכך היא ש- tf.distribute
קורא לפונקציית הקלט בהתקן המעבד של כל אחד מהעובדים. מערך הנתונים שנוצר על עובד נתון צריך להיות מוכן לשימוש על ידי כל העתקים באותו עובד.
ריסוק
האובייקט tf.distribute.InputContext
שמועבר באופן מרומז כארגומנט לפונקציית הקלט של המשתמש נוצר על ידי tf.distribute
מתחת למכסה המנוע. יש לו מידע על מספר העובדים, מזהה עובד נוכחי וכו'. פונקציית קלט זו יכולה להתמודד עם פיצול לפי מדיניות שנקבעה על ידי המשתמש באמצעות מאפיינים אלה שהם חלק מהאובייקט tf.distribute.InputContext
.
אחזור מראש
tf.distribute
אינו מוסיף טרנספורמציה של אחזור מראש בסוף מערך הנתונים tf.data.Dataset
המוחזר על ידי פונקציית הקלט שסופקה על ידי המשתמש.
איטרטורים מבוזרים
בדומה tf.data.Dataset
שאינם מבוזרים, תצטרך ליצור איטרטור tf.distribute.DistributedDataset
כדי לחזור עליו ולגשת לרכיבים ב- tf.distribute.DistributedDataset
. להלן הדרכים שבהן תוכל ליצור tf.distribute.DistributedIterator
ולהשתמש בו כדי לאמן את המודל שלך:
שימושים
השתמש בבניית לולאה Pythonic
אתה יכול להשתמש בלולאה Pythonic ידידותית למשתמש כדי לחזור על ה- tf.distribute.DistributedDataset
. האלמנטים המוחזרים מה- tf.distribute.DistributedIterator
יכולים להיות tf.Tensor
בודד או tf.distribute.DistributedValues
שמכיל ערך לכל העתק. מיקום הלולאה בתוך tf.function
.תעניק דחיפה לביצועים. עם זאת, break
return
אינם נתמכים כרגע עבור לולאה מעל tf.distribute.DistributedDataset
המוצב בתוך tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
השתמש iter
כדי ליצור איטרטור מפורש
כדי לחזור על האלמנטים במופע tf.distribute.DistributedDataset
, אתה יכול ליצור tf.distribute.DistributedIterator
באמצעות ה-API של iter
שעליו. עם איטרטור מפורש, אתה יכול לבצע איטרציה עבור מספר קבוע של שלבים. על מנת לקבל את האלמנט הבא tf.distribute.DistributedIterator
dist_iterator
, אתה יכול לקרוא next(dist_iterator)
, dist_iterator.get_next()
או dist_iterator.get_next_as_optional()
. שני הראשונים זהים בעצם:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
עם next()
או tf.distribute.DistributedIterator.get_next()
, אם ה- tf.distribute.DistributedIterator
הגיע לקצה שלו, תיזרק שגיאת OutOfRange. הלקוח יכול לתפוס את השגיאה בצד הפיתון ולהמשיך בעבודות אחרות כגון בדיקה והערכה. עם זאת, זה לא יעבוד אם אתה משתמש בלולאת אימון מארח (כלומר, הרץ מספר שלבים לכל tf.function
), שנראה כך:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
מכיל שלבים מרובים על ידי גלישת גוף הצעד בתוך tf.range
. במקרה זה, איטרציות שונות בלולאה ללא תלות יכולות להתחיל במקביל, כך שניתן להפעיל שגיאת OutOfRange באיטרציות מאוחרות יותר לפני שהחישוב של האיטרציות הקודמות מסתיים. ברגע שנזרק שגיאת OutOfRange, כל האופציות בפונקציה יסתיימו מיד. אם זה מקרה שהיית רוצה להימנע ממנו, חלופה שאינה זורקת שגיאת OutOfRange היא tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
מחזירה tf.experimental.Optional
שמכילה את הרכיב הבא או ללא ערך אם ה- tf.distribute.DistributedIterator
הגיע לסיומו.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
שימוש במאפיין element_spec
אם אתה מעביר את הרכיבים של מערך נתונים מבוזר ל- tf.function
ורוצה להבטיח tf.TypeSpec
, תוכל לציין את הארגומנט input_signature
של ה- tf.function
. הפלט של מערך נתונים מבוזר הוא tf.distribute.DistributedValues
שיכול לייצג את הקלט למכשיר בודד או למספר מכשירים. כדי לקבל את tf.TypeSpec
המתאים לערך מבוזר זה, אתה יכול להשתמש במאפיין element_spec
של מערך הנתונים המבוזר או אובייקט האיטרטור המבוזר.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
אצוות חלקיות
נתקלים באצוות חלקיות כאשר מופעי tf.data.Dataset
שמשתמשים יוצרים עשויים להכיל גדלי אצווה שאינם ניתנים לחלוקה שווה במספר העתקים או כאשר הקרדינליות של מופע מערך הנתונים אינה מתחלקת בגודל האצווה. משמעות הדבר היא שכאשר מערך הנתונים מופץ על פני מספר העתקים, הקריאה next
בכמה איטרטורים תגרום ל-OutOfRangeError. כדי לטפל במקרה השימוש הזה, tf.distribute
מחזיר אצווה דמה בגודל אצווה 0 על העתקים שאין להם יותר נתונים לעיבוד.
במקרה של עובד יחיד, אם הנתונים אינם מוחזרים בקריאה next
באיטרטור, נוצרות אצווה דמה בגודל אצווה 0 ומשמשות יחד עם הנתונים האמיתיים במערך הנתונים. במקרה של אצווה חלקית, אצווה הנתונים הגלובלית האחרונה תכיל נתונים אמיתיים לצד אצווה דמה של נתונים. תנאי העצירה לעיבוד נתונים בודק כעת אם לאחד מהעותקים יש נתונים. אם אין נתונים על אף אחת מהעותקים העתקים, נגרמת שגיאת OutOfRange.
במקרה של ריבוי עובדים, הערך הבוליאני המייצג נוכחות של נתונים על כל אחד מהעובדים נצבר באמצעות תקשורת צולבת העתק זה משמש כדי לזהות אם כל העובדים סיימו לעבד את מערך הנתונים המבוזר. מכיוון שזה כרוך בתקשורת צולבת בין עובדים, קיים עונש מסוים על ביצועים.
אזהרות
בעת שימוש בממשקי API של
tf.distribute.Strategy.experimental_distribute_dataset
עם הגדרת מספר עובדים, המשתמשים מעביריםtf.data.Dataset
שקורא מקבצים. אםtf.data.experimental.AutoShardPolicy
מוגדר ל-AUTO
אוFILE
, גודל האצווה בפועל לכל שלב עשוי להיות קטן יותר מגודל האצווה הגלובלי שהוגדר על ידי המשתמש. זה יכול לקרות כאשר הרכיבים הנותרים בקובץ קטנים מגודל האצווה העולמי. משתמשים יכולים למצות את מערך הנתונים מבלי להיות תלוי במספר השלבים להפעלה או להגדיר אתtf.data.experimental.AutoShardPolicy
ל-DATA
כדי לעקוף אותו.טרנספורמציות סטטיסטיות של מערך נתונים אינן נתמכות כעת עם
tf.distribute
, וכרגע מתעלמים מכל אופציה סטטיסטית שייתכן שיש למערך הנתונים. לדוגמה, אם למערך הנתונים שלך ישmap_fn
שמשתמש ב-tf.random.uniform
כדי לסובב תמונה, אז יש לך גרף נתונים שתלוי במצב (כלומר הזרע האקראי) במחשב המקומי שבו מתבצע תהליך הפיתון.tf.data.experimental.OptimizationOptions
ניסיוני אפשרויות המושבתות כברירת מחדל יכולות בהקשרים מסוימים -- כגון בשימוש יחד עםtf.distribute
-- לגרום לירידה בביצועים. עליך להפעיל אותם רק לאחר שתאמת שהם מועילים לביצועי עומס העבודה שלך בהגדרת הפצה.אנא עיין במדריך זה כיצד לייעל את צינור הקלט שלך עם
tf.data
באופן כללי. כמה טיפים נוספים:אם יש לך מספר עובדים ואתה משתמש ב-
tf.data.Dataset.list_files
כדי ליצור מערך נתונים מכל הקבצים התואמים דפוס גלוב אחד או יותר, זכור להגדיר את ארגומנט ה-seed
או להגדירshuffle=False
כך שכל עובד ירסק את הקובץ באופן עקבי.אם צינור הקלט שלך כולל ערבוב של הנתונים ברמת הרשומה וגם ניתוח הנתונים, אלא אם כן הנתונים שלא מנותחו גדולים משמעותית מהנתונים המנותחים (מה שבדרך כלל לא המקרה), תחילה ערבב ואז ניתוח, כפי שמוצג בדוגמה הבאה. זה עשוי להועיל לשימוש בזיכרון ולביצועים.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
שומר על מאגר פנימי של רכיביbuffer_size
, ובכך הפחתתbuffer_size
יכולה להקל על בעיית OOM.הסדר שבו הנתונים מעובדים על ידי העובדים בעת שימוש ב-
tf.distribute.experimental_distribute_dataset
אוtf.distribute.distribute_datasets_from_function
אינו מובטח. זה נדרש בדרך כלל אם אתה משתמש ב-tf.distribute
לחיזוי קנה מידה. עם זאת, ניתן להכניס אינדקס עבור כל רכיב באצווה ולהזמין פלטים בהתאם. הקטע הבא הוא דוגמה לאופן הזמנת פלטים.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
כיצד אוכל להפיץ את הנתונים שלי אם אני לא משתמש במופע קנוני של tf.data.Dataset?
לפעמים משתמשים לא יכולים להשתמש ב- tf.data.Dataset
כדי לייצג את הקלט שלהם, ולאחר מכן בממשקי ה-API שהוזכרו לעיל כדי להפיץ את מערך הנתונים למספר מכשירים. במקרים כאלה ניתן להשתמש בטנזורים גולמיים או בתשומות מגנרטור.
השתמש ב-experimental_distribute_values_from_function עבור קלט טנסור שרירותי
strategy.run
מקבל את tf.distribute.DistributedValues
שהוא הפלט של next(iterator)
. כדי להעביר את ערכי הטנסור, השתמש ב- experimental_distribute_values_from_function
כדי לבנות tf.distribute.DistributedValues
גולמיים.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
השתמש ב-tf.data.Dataset.from_generator אם הקלט שלך הוא ממחולל
אם יש לך פונקציית מחולל שברצונך להשתמש בה, תוכל ליצור מופע tf.data.Dataset
באמצעות ה-API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.