عرض على TensorFlow.org | تشغيل في Google Colab | عرض المصدر على جيثب | تحميل دفتر |
توفر واجهات برمجة التطبيقات tf.distribute طريقة سهلة للمستخدمين لتوسيع نطاق تدريبهم من جهاز واحد إلى أجهزة متعددة. عند قياس نموذجهم ، يتعين على المستخدمين أيضًا توزيع مدخلاتهم عبر أجهزة متعددة. يوفر tf.distribute
واجهات برمجة تطبيقات يمكنك من خلالها توزيع إدخالاتك تلقائيًا عبر الأجهزة.
سيوضح لك هذا الدليل الطرق المختلفة التي يمكنك من خلالها إنشاء مجموعة بيانات ومكررات موزعة باستخدام tf.distribute
APIs. بالإضافة إلى ذلك ، سيتم تغطية الموضوعات التالية:
- خيارات الاستخدام والتجزئة والتجميع عند استخدام
tf.distribute.Strategy.experimental_distribute_dataset
وtf.distribute.Strategy.distribute_datasets_from_function
. - الطرق المختلفة التي يمكنك من خلالها التكرار عبر مجموعة البيانات الموزعة.
- الاختلافات بين
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
APIs وtf.data
APIs وكذلك أي قيود قد يواجهها المستخدمون في استخدامهم.
لا يغطي هذا الدليل استخدام المدخلات الموزعة مع Keras APIs.
مجموعات البيانات الموزعة
لاستخدام tf.distribute
APIs للتوسع ، يوصى بأن يستخدم المستخدمون tf.data.Dataset
لتمثيل مدخلاتهم. تم إجراء توزيع tf.data.Dataset
tf.distribute
على سبيل المثال ، الجلب المسبق التلقائي للبيانات على كل جهاز تسريع) مع تحسينات الأداء التي يتم دمجها بانتظام في التنفيذ. إذا كانت لديك حالة استخدام لاستخدام شيء آخر غير tf.data.Dataset
، فيرجى الرجوع إلى قسم لاحق في هذا الدليل. في حلقة التدريب غير الموزعة ، يقوم المستخدمون أولاً بإنشاء مثيل tf.data.Dataset
ثم التكرار على العناصر. فمثلا:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
للسماح للمستخدمين باستخدام إستراتيجية tf.distribute
مع الحد الأدنى من التغييرات على التعليمات البرمجية الحالية للمستخدم ، تم تقديم اثنين من واجهات برمجة التطبيقات التي من شأنها توزيع مثيل tf.data.Dataset
وإرجاع كائن مجموعة البيانات الموزعة. يمكن للمستخدم بعد ذلك التكرار عبر مثيل مجموعة البيانات الموزعة هذا وتدريب نموذجهم كما كان من قبل. دعونا نلقي نظرة الآن على واجهتي API - tf.distribute.Strategy.experimental_distribute_dataset
و tf.distribute.Strategy.distribute_datasets_from_function
:
tf.distribute.Strategy.experimental_distribute_dataset
إستعمال
تأخذ واجهة برمجة التطبيقات هذه مثيل tf.data.Dataset
كإدخال وترجع مثيل tf.distribute.DistributedDataset
. يجب عليك تجميع مجموعة بيانات الإدخال بقيمة تساوي حجم الدُفعة العالمي. حجم الدُفعة العالمي هذا هو عدد العينات التي تريد معالجتها عبر جميع الأجهزة في خطوة واحدة. يمكنك تكرار مجموعة البيانات الموزعة هذه بأسلوب Pythonic أو إنشاء مكرر باستخدام iter
. الكائن الذي تم إرجاعه ليس مثيل tf.data.Dataset
ولا يدعم أي واجهات برمجة تطبيقات أخرى تقوم بتحويل مجموعة البيانات أو فحصها بأي شكل من الأشكال. هذه هي واجهة برمجة التطبيقات الموصى بها إذا لم يكن لديك طرق محددة تريد من خلالها مشاركة إدخالاتك عبر نسخ متماثلة مختلفة.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
الخصائص
الخلط
يعيد توزيع tf.distribute
نسخ الإدخال tf.data.Dataset
بحجم دفعة جديد يساوي حجم الدُفعة العام مقسومًا على عدد النسخ المتماثلة قيد المزامنة. عدد النسخ المتماثلة المتزامنة يساوي عدد الأجهزة التي تشارك في تقليل التدرج اللوني أثناء التدريب. عندما يقوم المستخدم بالاتصال next
ذلك على المكرر الموزع ، يتم إرجاع حجم مجموعة البيانات لكل نسخة متماثلة على كل نسخة متماثلة. سيكون عدد العناصر الأصلية لمجموعة البيانات المعاد تطابقه دائمًا مضاعفًا لعدد النسخ المتماثلة. هنا بضعة أمثلة:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- بدون توزيع:
- الدفعة 1: [0 ، 1 ، 2 ، 3]
- الدفعة 2: [4، 5]
مع توزيع أكثر من نسختين متماثلة. يتم تقسيم الدفعة الأخيرة ([4 ، 5]) بين نسختين متماثلتين.
الدفعة 1:
- النسخة المتماثلة 1: [0، 1]
- نسخة 2: [2، 3]
الدفعة 2:
- نسخة 2: [4]
- نسخة 2: [5]
tf.data.Dataset.range(4).batch(4)
- بدون توزيع:
- الدفعة 1: [[0] ، [1] ، [2] ، [3]]
- مع التوزيع على 5 نسخ طبق الأصل:
- الدفعة 1:
- النسخة المتماثلة 1: [0]
- النسخة المتماثلة 2: [1]
- نسخة 3: [2]
- نسخة 4: [3]
- النسخة المكررة 5: []
tf.data.Dataset.range(8).batch(4)
- بدون توزيع:
- الدفعة 1: [0 ، 1 ، 2 ، 3]
- الدفعة 2: [4 ، 5 ، 6 ، 7]
- مع التوزيع على 3 نسخ طبق الأصل:
- الدفعة 1:
- النسخة المتماثلة 1: [0، 1]
- نسخة 2: [2، 3]
- نسخة 3: []
- الدفعة 2:
- نسخة 1: [4، 5]
- نسخة 2: [6، 7]
- نسخة 3: []
تحتوي إعادة تجميع مجموعة البيانات على تعقيد مساحة يزداد خطيًا مع عدد النسخ المتماثلة. هذا يعني أنه بالنسبة لحالة استخدام التدريب متعدد العمال ، يمكن أن يتعرض خط أنابيب الإدخال لأخطاء OOM.
التشرذم
tf.distribute
أيضًا بالتوزيع التلقائي لمجموعة بيانات الإدخال في تدريب متعدد العمال باستخدام MultiWorkerMirroredStrategy
و TPUStrategy
. يتم إنشاء كل مجموعة بيانات على جهاز CPU الخاص بالعامل. تعني المشاركة التلقائية لمجموعة بيانات عبر مجموعة من العمال أنه يتم تعيين مجموعة فرعية من مجموعة البيانات بأكملها لكل عامل (إذا تم تعيين tf.data.experimental.AutoShardPolicy
الصحيح). هذا لضمان أنه في كل خطوة ، سيتم معالجة حجم دفعة عالمية لعناصر مجموعة البيانات غير المتداخلة بواسطة كل عامل. يحتوي Autosharding على خيارين مختلفين يمكن تحديدهما باستخدام tf.data.experimental.DistributeOptions
. لاحظ أنه لا توجد مشاركة تلقائية في تدريب العمال المتعددين باستخدام ParameterServerStrategy
، ويمكن العثور على مزيد من المعلومات حول إنشاء مجموعة البيانات باستخدام هذه الإستراتيجية في البرنامج التعليمي Parameter Server Strategy .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
توجد ثلاثة خيارات مختلفة يمكنك تعيينها لـ tf.data.experimental.AutoShardPolicy
:
- تلقائي: هذا هو الخيار الافتراضي مما يعني أنه سيتم إجراء محاولة لجزء بواسطة FILE. تفشل محاولة التجزئة حسب FILE إذا لم يتم اكتشاف مجموعة بيانات قائمة على ملف. ثم يعود
tf.distribute
إلى التجزئة حسب البيانات. لاحظ أنه إذا كانت مجموعة بيانات الإدخال تستند إلى ملف ولكن عدد الملفات أقل من عدد العمال ، فسيتم رفع خطأInvalidArgumentError
. إذا حدث هذا ، فاضبط السياسة صراحةً علىAutoShardPolicy.DATA
، أو قسّم مصدر الإدخال إلى ملفات أصغر بحيث يكون عدد الملفات أكبر من عدد العمال. FILE: هذا هو الخيار إذا كنت تريد تقسيم ملفات الإدخال إلى جميع العاملين. يجب عليك استخدام هذا الخيار إذا كان عدد ملفات الإدخال أكبر بكثير من عدد العمال والبيانات الموجودة في الملفات موزعة بالتساوي. يتمثل الجانب السلبي لهذا الخيار في وجود عمال خاملين إذا لم يتم توزيع البيانات الموجودة في الملفات بالتساوي. إذا كان عدد الملفات أقل من عدد العمال ، فسيتم رفع خطأ
InvalidArgumentError
. إذا حدث هذا ، فاضبط السياسة صراحةً علىAutoShardPolicy.DATA
. على سبيل المثال ، دعنا نوزع ملفين على عاملين مع نسخة متماثلة واحدة لكل منهما. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5] ويحتوي الملف 2 على [6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2 ويكون حجم الدُفعة العام 4.- عامل 0:
- الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
- الدفعة 2 = النسخة المتماثلة 1: [2، 3]
- الدفعة 3 = النسخة المتماثلة 1: [4]
- الدفعة 4 = النسخة المتماثلة 1: [5]
- العامل 1:
- الدفعة 1 = النسخة المتماثلة 2: [6 ، 7]
- الدفعة 2 = النسخة المتماثلة 2: [8 ، 9]
- الدفعة 3 = النسخة المتماثلة 2: [10]
- الدفعة 4 = النسخة المتماثلة 2: [11]
البيانات: سيؤدي هذا إلى تجميع العناصر تلقائيًا عبر جميع العمال. سيقرأ كل عامل مجموعة البيانات بأكملها ويعالج الجزء المخصص لها فقط. سيتم التخلص من جميع القطع الأخرى. يتم استخدام هذا بشكل عام إذا كان عدد ملفات الإدخال أقل من عدد العمال وتريد تقسيم البيانات بشكل أفضل عبر جميع العاملين. الجانب السلبي هو أن مجموعة البيانات بأكملها ستتم قراءتها على كل عامل. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2.
- عامل 0:
- الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
- الدفعة 2 = النسخة المتماثلة 1: [4 ، 5]
- الدفعة 3 = النسخة المتماثلة 1: [8 ، 9]
- العامل 1:
- الدفعة 1 = النسخة المتماثلة 2: [2 ، 3]
- الدفعة 2 = النسخة المتماثلة 2: [6 ، 7]
- الدفعة 3 = النسخة المتماثلة 2: [10 ، 11]
إيقاف التشغيل: إذا قمت بإيقاف تشغيل ميزة "المشاركة التلقائية" ، فسيقوم كل عامل بمعالجة جميع البيانات. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. دع العدد الإجمالي للنسخ المتماثلة المتزامنة هو 2. ثم سيرى كل عامل التوزيع التالي:
- عامل 0:
- الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
- الدفعة 2 = النسخة المتماثلة 1: [2، 3]
- الدفعة 3 = النسخة المتماثلة 1: [4 ، 5]
- الدفعة 4 = النسخة المتماثلة 1: [6 ، 7]
- الدفعة 5 = النسخة المتماثلة 1: [8 ، 9]
الدفعة 6 = النسخة المتماثلة 1: [10 ، 11]
العامل 1:
الدفعة 1 = النسخة المتماثلة 2: [0 ، 1]
الدفعة 2 = النسخة المتماثلة 2: [2، 3]
الدفعة 3 = النسخة المتماثلة 2: [4 ، 5]
الدفعة 4 = النسخة المتماثلة 2: [6 ، 7]
الدفعة 5 = النسخة المتماثلة 2: [8 ، 9]
الدفعة 6 = النسخة المتماثلة 2: [10 ، 11]
الجلب المسبق
بشكل افتراضي ، يضيف tf.distribute
تحويل الجلب المسبق في نهاية مثيل tf.data.Dataset
المقدم من المستخدم. وسيطة تحويل الجلب المسبق والتي تكون buffer_size
تساوي عدد النسخ المتماثلة المتزامنة.
tf.distribute.Strategy.distribute_datasets_from_function
إستعمال
تأخذ واجهة برمجة التطبيقات هذه دالة إدخال وترجع مثيل tf.distribute.DistributedDataset
. دالة الإدخال التي يمررها المستخدمون لها وسيطة tf.distribute.InputContext
ويجب أن تُرجع مثيل tf.data.Dataset
. باستخدام واجهة برمجة التطبيقات هذه ، لا tf.distribute
أية تغييرات أخرى على مثيل tf.data.Dataset
الخاص بالمستخدم الذي تم إرجاعه من دالة الإدخال. تقع على عاتق المستخدم مسؤولية تجميع مجموعة البيانات وتقسيمها. يستدعي tf.distribute
وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. بصرف النظر عن السماح للمستخدمين بتحديد منطق التجميع والتجزئة الخاص بهم ، توضح واجهة برمجة التطبيقات هذه أيضًا قابلية تطوير وأداء أفضل مقارنةً بـ tf.distribute.Strategy.experimental_distribute_dataset
عند استخدامها لتدريب العديد من العمال.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
الخصائص
الخلط
يجب تجميع مثيل tf.data.Dataset
الذي يمثل قيمة الإرجاع لوظيفة الإدخال في دُفعات باستخدام حجم الدُفعة لكل نسخة متماثلة. حجم الدُفعة لكل نسخة متماثلة هو حجم الدُفعة العمومي مقسومًا على عدد النسخ المتماثلة التي تشارك في تدريب المزامنة. هذا لأن tf.distribute
يستدعي وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. يجب أن تكون مجموعة البيانات التي تم إنشاؤها على عامل معين جاهزة للاستخدام من قبل جميع النسخ المتماثلة الموجودة على هذا العامل.
التشرذم
يتم إنشاء الكائن tf.distribute.InputContext
الذي يتم تمريره ضمنيًا كوسيطة إلى وظيفة الإدخال الخاصة بالمستخدم بواسطة tf.distribute
أسفل الغطاء. يحتوي على معلومات حول عدد العمال ومعرف العامل الحالي وما إلى ذلك. يمكن لوظيفة الإدخال هذه معالجة التجزئة وفقًا للسياسات التي حددها المستخدم باستخدام هذه الخصائص التي تعد جزءًا من كائن tf.distribute.InputContext
.
الجلب المسبق
لا يضيف tf.distribute
تحويل الجلب المسبق في نهاية tf.data.Dataset
التي يتم إرجاعها بواسطة وظيفة الإدخال التي يوفرها المستخدم.
تكرارات موزعة
على غرار مثيلات tf.data.Dataset
غير الموزعة ، ستحتاج إلى إنشاء مكرر في مثيلات tf.distribute.DistributedDataset
والوصول إلى العناصر الموجودة في tf.distribute.DistributedDataset
. فيما يلي الطرق التي يمكنك من خلالها إنشاء tf.distribute.DistributedIterator
واستخدامه لتدريب نموذجك:
الأعراف
استخدم بنية Pythonic for loop
يمكنك استخدام حلقة Pythonic سهلة الاستخدام للتكرار عبر tf.distribute.DistributedDataset
. العناصر التي تم إرجاعها من tf.distribute.DistributedIterator
يمكن أن تكون tf.Tensor
واحد أو tf.distribute.DistributedValues
التي تحتوي على قيمة لكل نسخة متماثلة. سيؤدي وضع الحلقة داخل tf.function
إلى تعزيز الأداء. ومع ذلك ، فإن break
return
غير مدعومين حاليًا لحلقة عبر tf.distribute.DistributedDataset
الموضوعة داخل tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
استخدم iter
لإنشاء مكرر صريح
للتكرار على العناصر في مثيل tf.distribute.DistributedDataset
، يمكنك إنشاء tf.distribute.DistributedIterator
باستخدام واجهة برمجة التطبيقات iter
عليه. باستخدام مكرر صريح ، يمكنك التكرار لعدد ثابت من الخطوات. من أجل الحصول على العنصر التالي من tf.distribute.DistributedIterator
مثيل dist_iterator
، يمكنك استدعاء next(dist_iterator)
أو dist_iterator.get_next()
أو dist_iterator.get_next_as_optional()
. السابقان هما في الأساس نفس الشيء:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
مع next()
أو tf.distribute.DistributedIterator.get_next()
، إذا وصل tf.distribute.DistributedIterator
إلى نهايته ، فسيتم طرح خطأ OutOfRange. يمكن للعميل اكتشاف الخطأ من جانب Python ومواصلة القيام بأعمال أخرى مثل نقاط التفتيش والتقييم. ومع ذلك ، لن ينجح هذا إذا كنت تستخدم حلقة تدريب مضيف (على سبيل المثال ، قم بتشغيل خطوات متعددة لكل tf.function
) ، والتي تبدو كما يلي:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
يحتوي train_fn
على خطوات متعددة عن طريق لف جسم الخطوة داخل نطاق tf.range
. في هذه الحالة ، يمكن أن تبدأ التكرارات المختلفة في الحلقة بدون تبعية بالتوازي ، لذلك يمكن تشغيل خطأ OutOfRange في التكرارات اللاحقة قبل انتهاء حساب التكرارات السابقة. بمجرد إلقاء خطأ OutOfRange ، سيتم إنهاء جميع العمليات في الوظيفة على الفور. إذا كانت هذه بعض الحالات التي ترغب في تجنبها ، فإن البديل الذي لا ينتج عنه خطأ OutOfRange هو tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
بإرجاع tf.experimental.Optional
الذي يحتوي على العنصر التالي أو لا يحتوي على قيمة إذا وصل tf.distribute.DistributedIterator
إلى نهايته.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
استخدام خاصية element_spec
إذا قمت بتمرير عناصر مجموعة بيانات موزعة إلى tf.function
وتريد ضمان tf.TypeSpec
، فيمكنك تحديد وسيطة input_signature
الخاصة tf.function
. ناتج مجموعة البيانات الموزعة هو tf.distribute.DistributedValues
التي يمكن أن تمثل الإدخال إلى جهاز واحد أو أجهزة متعددة. للحصول على tf.TypeSpec
المطابق لهذه القيمة الموزعة ، يمكنك استخدام خاصية element_spec
لمجموعة البيانات الموزعة أو كائن المكرر الموزع.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
دفعات جزئية
تتم مصادفة الدُفعات الجزئية عندما قد تحتوي مثيلات tf.data.Dataset
التي ينشئها المستخدمون على أحجام دُفعات لا تقبل القسمة بالتساوي على عدد النسخ المتماثلة أو عندما لا تكون العلاقة الأساسية لمثيل مجموعة البيانات قابلة للقسمة على حجم الدُفعة. هذا يعني أنه عند توزيع مجموعة البيانات على عدة نسخ متماثلة ، فإن الاستدعاء next
على بعض التكرارات سينتج عنه OutOfRangeError. للتعامل مع حالة الاستخدام هذه ، tf.distribute
بإرجاع دفعات وهمية بحجم الدُفعة 0 على النسخ المتماثلة التي لا تحتوي على أي بيانات أخرى للمعالجة.
بالنسبة لحالة العامل المنفردة ، إذا لم يتم إرجاع البيانات بواسطة الاستدعاء next
في المكرر ، يتم إنشاء دفعات وهمية بحجم 0 دفعة واستخدامها مع البيانات الحقيقية في مجموعة البيانات. في حالة الدُفعات الجزئية ، ستحتوي آخر دفعة عالمية من البيانات على بيانات حقيقية جنبًا إلى جنب مع مجموعات وهمية من البيانات. يتحقق شرط الإيقاف لمعالجة البيانات الآن مما إذا كان أي من النسخ المتماثلة يحتوي على بيانات. إذا لم تكن هناك بيانات في أي من النسخ المتماثلة ، فسيتم طرح خطأ OutOfRange.
بالنسبة للحالة متعددة العمال ، يتم تجميع القيمة المنطقية التي تمثل وجود البيانات على كل عامل باستخدام اتصال متماثل متقاطع ويستخدم هذا لتحديد ما إذا كان جميع العمال قد انتهوا من معالجة مجموعة البيانات الموزعة. نظرًا لأن هذا ينطوي على اتصال عبر العمال ، فهناك بعض عقوبات الأداء.
تحفظات
عند استخدام
tf.distribute.Strategy.experimental_distribute_dataset
APIs مع إعداد عامل متعدد ، يمرر المستخدمونtf.data.Dataset
الذي يقرأ من الملفات. إذا تم تعيينtf.data.experimental.AutoShardPolicy
علىAUTO
أوFILE
، فقد يكون حجم الدُفعة الفعلي لكل خطوة أصغر من حجم الدُفعة العام الذي يحدده المستخدم. يمكن أن يحدث هذا عندما تكون العناصر المتبقية في الملف أقل من حجم الدُفعة العام. يمكن للمستخدمين إما استنفاد مجموعة البيانات دون الاعتماد على عدد خطوات التشغيل أو تعيينtf.data.experimental.AutoShardPolicy
إلىDATA
للتغلب عليها.تحويلات مجموعة البيانات ذات الحالة غير مدعومة حاليًا مع
tf.distribute
وأي عمليات ذات حالة قد تكون مجموعة البيانات قد تم تجاهلها حاليًا. على سبيل المثال ، إذا كانت مجموعة البيانات الخاصة بك تحتوي علىmap_fn
يستخدمtf.random.uniform
لتدوير صورة ، فلديك رسم بياني لمجموعة بيانات يعتمد على الحالة (أي البذور العشوائية) على الجهاز المحلي حيث يتم تنفيذ عملية Python.يمكن أن تتسبب الخيارات التجريبية
tf.data.experimental.OptimizationOptions
التي يتم تعطيلها افتراضيًا في سياقات معينة - مثل عند استخدامها معtf.distribute
- في تدهور الأداء. يجب عليك تمكينها فقط بعد التحقق من أنها تستفيد من أداء عبء العمل الخاص بك في إعداد التوزيع.يرجى الرجوع إلى هذا الدليل لمعرفة كيفية تحسين خط أنابيب الإدخال الخاص بك باستخدام
tf.data
بشكل عام. بعض النصائح الإضافية:إذا كان لديك العديد من العمال وكنت تستخدم
tf.data.Dataset.list_files
لإنشاء مجموعة بيانات من جميع الملفات التي تطابق واحدًا أو أكثر من أنماط الكرة الأرضية ، فتذكر تعيين الوسيطةseed
أو ضبطshuffle=False
بحيث يقوم كل عامل بتقسيم الملف باستمرار.إذا كان خط أنابيب الإدخال الخاص بك يتضمن كلاً من خلط البيانات على مستوى السجل وتحليل البيانات ، ما لم تكن البيانات غير المحللة أكبر بكثير من البيانات التي تم تحليلها (وهي ليست الحالة عادةً) ، فتبادل أولاً ثم التحليل ، كما هو موضح في المثال التالي. قد يفيد هذا استخدام الذاكرة والأداء.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
على مخزن مؤقت داخلي لعناصرbuffer_size
المخزن المؤقت ، وبالتالي تقليلbuffer_size
المخزن المؤقت يمكن أن يخفف من مشكلة OOM.الترتيب الذي تتم معالجة البيانات به بواسطة العمال عند استخدام
tf.distribute.experimental_distribute_dataset
أوtf.distribute.distribute_datasets_from_function
غير مضمون. عادة ما يكون هذا مطلوبًا إذا كنت تستخدمtf.distribute
لتوقع المقياس. ومع ذلك ، يمكنك إدراج فهرس لكل عنصر في الدُفعة وطلب المخرجات وفقًا لذلك. المقتطف التالي هو مثال على كيفية طلب المخرجات.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
كيف أقوم بتوزيع بياناتي إذا كنت لا أستخدم مثيل tf.data.Dataset الكنسي؟
في بعض الأحيان لا يمكن للمستخدمين استخدام tf.data.Dataset
لتمثيل مدخلاتهم وبالتالي واجهات برمجة التطبيقات المذكورة أعلاه لتوزيع مجموعة البيانات على أجهزة متعددة. في مثل هذه الحالات ، يمكنك استخدام موتر خام أو مدخلات من المولد.
استخدم دالة_توزيع_القيمة_التجريبية للدالة_الخاصة_لإدخالات الموتر التعسفية
يقبل tf.distribute.DistributedValues
strategy.run
ناتج next(iterator)
. لتمرير قيم الموتر ، استخدم experimental_distribute_values_from_function
لإنشاء tf.distribute.DistributedValues
من الموترات الأولية.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
استخدم tf.data.Dataset.from_generator إذا كان الإدخال الخاص بك من مولد
إذا كانت لديك وظيفة منشئ تريد استخدامها ، فيمكنك إنشاء مثيل tf.data.Dataset
باستخدام from_generator
API.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.