المدخلات الموزعة

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

توفر واجهات برمجة التطبيقات tf.distribute طريقة سهلة للمستخدمين لتوسيع نطاق تدريبهم من جهاز واحد إلى أجهزة متعددة. عند قياس نموذجهم ، يتعين على المستخدمين أيضًا توزيع مدخلاتهم عبر أجهزة متعددة. يوفر tf.distribute واجهات برمجة تطبيقات يمكنك من خلالها توزيع إدخالاتك تلقائيًا عبر الأجهزة.

سيوضح لك هذا الدليل الطرق المختلفة التي يمكنك من خلالها إنشاء مجموعة بيانات ومكررات موزعة باستخدام tf.distribute APIs. بالإضافة إلى ذلك ، سيتم تغطية الموضوعات التالية:

لا يغطي هذا الدليل استخدام المدخلات الموزعة مع Keras APIs.

مجموعات البيانات الموزعة

لاستخدام tf.distribute APIs للتوسع ، يوصى بأن يستخدم المستخدمون tf.data.Dataset لتمثيل مدخلاتهم. تم إجراء توزيع tf.data.Dataset tf.distribute على سبيل المثال ، الجلب المسبق التلقائي للبيانات على كل جهاز تسريع) مع تحسينات الأداء التي يتم دمجها بانتظام في التنفيذ. إذا كانت لديك حالة استخدام لاستخدام شيء آخر غير tf.data.Dataset ، فيرجى الرجوع إلى قسم لاحق في هذا الدليل. في حلقة التدريب غير الموزعة ، يقوم المستخدمون أولاً بإنشاء مثيل tf.data.Dataset ثم التكرار على العناصر. فمثلا:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

للسماح للمستخدمين باستخدام إستراتيجية tf.distribute مع الحد الأدنى من التغييرات على التعليمات البرمجية الحالية للمستخدم ، تم تقديم اثنين من واجهات برمجة التطبيقات التي من شأنها توزيع مثيل tf.data.Dataset وإرجاع كائن مجموعة البيانات الموزعة. يمكن للمستخدم بعد ذلك التكرار عبر مثيل مجموعة البيانات الموزعة هذا وتدريب نموذجهم كما كان من قبل. دعونا نلقي نظرة الآن على واجهتي API - tf.distribute.Strategy.experimental_distribute_dataset و tf.distribute.Strategy.distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

إستعمال

تأخذ واجهة برمجة التطبيقات هذه مثيل tf.data.Dataset كإدخال وترجع مثيل tf.distribute.DistributedDataset . يجب عليك تجميع مجموعة بيانات الإدخال بقيمة تساوي حجم الدُفعة العالمي. حجم الدُفعة العالمي هذا هو عدد العينات التي تريد معالجتها عبر جميع الأجهزة في خطوة واحدة. يمكنك تكرار مجموعة البيانات الموزعة هذه بأسلوب Pythonic أو إنشاء مكرر باستخدام iter . الكائن الذي تم إرجاعه ليس مثيل tf.data.Dataset ولا يدعم أي واجهات برمجة تطبيقات أخرى تقوم بتحويل مجموعة البيانات أو فحصها بأي شكل من الأشكال. هذه هي واجهة برمجة التطبيقات الموصى بها إذا لم يكن لديك طرق محددة تريد من خلالها مشاركة إدخالاتك عبر نسخ متماثلة مختلفة.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

الخصائص

الخلط

يعيد توزيع tf.distribute نسخ الإدخال tf.data.Dataset بحجم دفعة جديد يساوي حجم الدُفعة العام مقسومًا على عدد النسخ المتماثلة قيد المزامنة. عدد النسخ المتماثلة المتزامنة يساوي عدد الأجهزة التي تشارك في تقليل التدرج اللوني أثناء التدريب. عندما يقوم المستخدم بالاتصال next ذلك على المكرر الموزع ، يتم إرجاع حجم مجموعة البيانات لكل نسخة متماثلة على كل نسخة متماثلة. سيكون عدد العناصر الأصلية لمجموعة البيانات المعاد تطابقه دائمًا مضاعفًا لعدد النسخ المتماثلة. هنا بضعة أمثلة:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • بدون توزيع:
    • الدفعة 1: [0 ، 1 ، 2 ، 3]
    • الدفعة 2: [4، 5]
    • مع توزيع أكثر من نسختين متماثلة. يتم تقسيم الدفعة الأخيرة ([4 ، 5]) بين نسختين متماثلتين.

    • الدفعة 1:

      • النسخة المتماثلة 1: [0، 1]
      • نسخة 2: [2، 3]
    • الدفعة 2:

      • نسخة 2: [4]
      • نسخة 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • بدون توزيع:
    • الدفعة 1: [[0] ، [1] ، [2] ، [3]]
    • مع التوزيع على 5 نسخ طبق الأصل:
    • الدفعة 1:
      • النسخة المتماثلة 1: [0]
      • النسخة المتماثلة 2: [1]
      • نسخة 3: [2]
      • نسخة 4: [3]
      • النسخة المكررة 5: []
  • tf.data.Dataset.range(8).batch(4)

    • بدون توزيع:
    • الدفعة 1: [0 ، 1 ، 2 ، 3]
    • الدفعة 2: [4 ، 5 ، 6 ، 7]
    • مع التوزيع على 3 نسخ طبق الأصل:
    • الدفعة 1:
      • النسخة المتماثلة 1: [0، 1]
      • نسخة 2: [2، 3]
      • نسخة 3: []
    • الدفعة 2:
      • نسخة 1: [4، 5]
      • نسخة 2: [6، 7]
      • نسخة 3: []

تحتوي إعادة تجميع مجموعة البيانات على تعقيد مساحة يزداد خطيًا مع عدد النسخ المتماثلة. هذا يعني أنه بالنسبة لحالة استخدام التدريب متعدد العمال ، يمكن أن يتعرض خط أنابيب الإدخال لأخطاء OOM.

التشرذم

tf.distribute أيضًا بالتوزيع التلقائي لمجموعة بيانات الإدخال في تدريب متعدد العمال باستخدام MultiWorkerMirroredStrategy و TPUStrategy . يتم إنشاء كل مجموعة بيانات على جهاز CPU الخاص بالعامل. تعني المشاركة التلقائية لمجموعة بيانات عبر مجموعة من العمال أنه يتم تعيين مجموعة فرعية من مجموعة البيانات بأكملها لكل عامل (إذا تم تعيين tf.data.experimental.AutoShardPolicy الصحيح). هذا لضمان أنه في كل خطوة ، سيتم معالجة حجم دفعة عالمية لعناصر مجموعة البيانات غير المتداخلة بواسطة كل عامل. يحتوي Autosharding على خيارين مختلفين يمكن تحديدهما باستخدام tf.data.experimental.DistributeOptions . لاحظ أنه لا توجد مشاركة تلقائية في تدريب العمال المتعددين باستخدام ParameterServerStrategy ، ويمكن العثور على مزيد من المعلومات حول إنشاء مجموعة البيانات باستخدام هذه الإستراتيجية في البرنامج التعليمي Parameter Server Strategy .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

توجد ثلاثة خيارات مختلفة يمكنك تعيينها لـ tf.data.experimental.AutoShardPolicy :

  • تلقائي: هذا هو الخيار الافتراضي مما يعني أنه سيتم إجراء محاولة لجزء بواسطة FILE. تفشل محاولة التجزئة حسب FILE إذا لم يتم اكتشاف مجموعة بيانات قائمة على ملف. ثم يعود tf.distribute إلى التجزئة حسب البيانات. لاحظ أنه إذا كانت مجموعة بيانات الإدخال تستند إلى ملف ولكن عدد الملفات أقل من عدد العمال ، فسيتم رفع خطأ InvalidArgumentError . إذا حدث هذا ، فاضبط السياسة صراحةً على AutoShardPolicy.DATA ، أو قسّم مصدر الإدخال إلى ملفات أصغر بحيث يكون عدد الملفات أكبر من عدد العمال.
  • FILE: هذا هو الخيار إذا كنت تريد تقسيم ملفات الإدخال إلى جميع العاملين. يجب عليك استخدام هذا الخيار إذا كان عدد ملفات الإدخال أكبر بكثير من عدد العمال والبيانات الموجودة في الملفات موزعة بالتساوي. يتمثل الجانب السلبي لهذا الخيار في وجود عمال خاملين إذا لم يتم توزيع البيانات الموجودة في الملفات بالتساوي. إذا كان عدد الملفات أقل من عدد العمال ، فسيتم رفع خطأ InvalidArgumentError . إذا حدث هذا ، فاضبط السياسة صراحةً على AutoShardPolicy.DATA . على سبيل المثال ، دعنا نوزع ملفين على عاملين مع نسخة متماثلة واحدة لكل منهما. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5] ويحتوي الملف 2 على [6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2 ويكون حجم الدُفعة العام 4.

    • عامل 0:
    • الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
    • الدفعة 2 = النسخة المتماثلة 1: [2، 3]
    • الدفعة 3 = النسخة المتماثلة 1: [4]
    • الدفعة 4 = النسخة المتماثلة 1: [5]
    • العامل 1:
    • الدفعة 1 = النسخة المتماثلة 2: [6 ، 7]
    • الدفعة 2 = النسخة المتماثلة 2: [8 ، 9]
    • الدفعة 3 = النسخة المتماثلة 2: [10]
    • الدفعة 4 = النسخة المتماثلة 2: [11]
  • البيانات: سيؤدي هذا إلى تجميع العناصر تلقائيًا عبر جميع العمال. سيقرأ كل عامل مجموعة البيانات بأكملها ويعالج الجزء المخصص لها فقط. سيتم التخلص من جميع القطع الأخرى. يتم استخدام هذا بشكل عام إذا كان عدد ملفات الإدخال أقل من عدد العمال وتريد تقسيم البيانات بشكل أفضل عبر جميع العاملين. الجانب السلبي هو أن مجموعة البيانات بأكملها ستتم قراءتها على كل عامل. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. اجعل العدد الإجمالي للنسخ المتماثلة المتزامنة 2.

    • عامل 0:
    • الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
    • الدفعة 2 = النسخة المتماثلة 1: [4 ، 5]
    • الدفعة 3 = النسخة المتماثلة 1: [8 ، 9]
    • العامل 1:
    • الدفعة 1 = النسخة المتماثلة 2: [2 ، 3]
    • الدفعة 2 = النسخة المتماثلة 2: [6 ، 7]
    • الدفعة 3 = النسخة المتماثلة 2: [10 ، 11]
  • إيقاف التشغيل: إذا قمت بإيقاف تشغيل ميزة "المشاركة التلقائية" ، فسيقوم كل عامل بمعالجة جميع البيانات. على سبيل المثال ، دعونا نوزع ملفًا واحدًا على عاملين. يحتوي الملف 1 على [0 ، 1 ، 2 ، 3 ، 4 ، 5 ، 6 ، 7 ، 8 ، 9 ، 10 ، 11]. دع العدد الإجمالي للنسخ المتماثلة المتزامنة هو 2. ثم سيرى كل عامل التوزيع التالي:

    • عامل 0:
    • الدفعة 1 = النسخة المتماثلة 1: [0 ، 1]
    • الدفعة 2 = النسخة المتماثلة 1: [2، 3]
    • الدفعة 3 = النسخة المتماثلة 1: [4 ، 5]
    • الدفعة 4 = النسخة المتماثلة 1: [6 ، 7]
    • الدفعة 5 = النسخة المتماثلة 1: [8 ، 9]
    • الدفعة 6 = النسخة المتماثلة 1: [10 ، 11]

    • العامل 1:

    • الدفعة 1 = النسخة المتماثلة 2: [0 ، 1]

    • الدفعة 2 = النسخة المتماثلة 2: [2، 3]

    • الدفعة 3 = النسخة المتماثلة 2: [4 ، 5]

    • الدفعة 4 = النسخة المتماثلة 2: [6 ، 7]

    • الدفعة 5 = النسخة المتماثلة 2: [8 ، 9]

    • الدفعة 6 = النسخة المتماثلة 2: [10 ، 11]

الجلب المسبق

بشكل افتراضي ، يضيف tf.distribute تحويل الجلب المسبق في نهاية مثيل tf.data.Dataset المقدم من المستخدم. وسيطة تحويل الجلب المسبق والتي تكون buffer_size تساوي عدد النسخ المتماثلة المتزامنة.

tf.distribute.Strategy.distribute_datasets_from_function

إستعمال

تأخذ واجهة برمجة التطبيقات هذه دالة إدخال وترجع مثيل tf.distribute.DistributedDataset . دالة الإدخال التي يمررها المستخدمون لها وسيطة tf.distribute.InputContext ويجب أن تُرجع مثيل tf.data.Dataset . باستخدام واجهة برمجة التطبيقات هذه ، لا tf.distribute أية تغييرات أخرى على مثيل tf.data.Dataset الخاص بالمستخدم الذي تم إرجاعه من دالة الإدخال. تقع على عاتق المستخدم مسؤولية تجميع مجموعة البيانات وتقسيمها. يستدعي tf.distribute وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. بصرف النظر عن السماح للمستخدمين بتحديد منطق التجميع والتجزئة الخاص بهم ، توضح واجهة برمجة التطبيقات هذه أيضًا قابلية تطوير وأداء أفضل مقارنةً بـ tf.distribute.Strategy.experimental_distribute_dataset عند استخدامها لتدريب العديد من العمال.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

الخصائص

الخلط

يجب تجميع مثيل tf.data.Dataset الذي يمثل قيمة الإرجاع لوظيفة الإدخال في دُفعات باستخدام حجم الدُفعة لكل نسخة متماثلة. حجم الدُفعة لكل نسخة متماثلة هو حجم الدُفعة العمومي مقسومًا على عدد النسخ المتماثلة التي تشارك في تدريب المزامنة. هذا لأن tf.distribute يستدعي وظيفة الإدخال على جهاز وحدة المعالجة المركزية لكل عامل. يجب أن تكون مجموعة البيانات التي تم إنشاؤها على عامل معين جاهزة للاستخدام من قبل جميع النسخ المتماثلة الموجودة على هذا العامل.

التشرذم

يتم إنشاء الكائن tf.distribute.InputContext الذي يتم تمريره ضمنيًا كوسيطة إلى وظيفة الإدخال الخاصة بالمستخدم بواسطة tf.distribute أسفل الغطاء. يحتوي على معلومات حول عدد العمال ومعرف العامل الحالي وما إلى ذلك. يمكن لوظيفة الإدخال هذه معالجة التجزئة وفقًا للسياسات التي حددها المستخدم باستخدام هذه الخصائص التي تعد جزءًا من كائن tf.distribute.InputContext .

الجلب المسبق

لا يضيف tf.distribute تحويل الجلب المسبق في نهاية tf.data.Dataset التي يتم إرجاعها بواسطة وظيفة الإدخال التي يوفرها المستخدم.

تكرارات موزعة

على غرار مثيلات tf.data.Dataset غير الموزعة ، ستحتاج إلى إنشاء مكرر في مثيلات tf.distribute.DistributedDataset والوصول إلى العناصر الموجودة في tf.distribute.DistributedDataset . فيما يلي الطرق التي يمكنك من خلالها إنشاء tf.distribute.DistributedIterator واستخدامه لتدريب نموذجك:

الأعراف

استخدم بنية Pythonic for loop

يمكنك استخدام حلقة Pythonic سهلة الاستخدام للتكرار عبر tf.distribute.DistributedDataset . العناصر التي تم إرجاعها من tf.distribute.DistributedIterator يمكن أن تكون tf.Tensor واحد أو tf.distribute.DistributedValues التي تحتوي على قيمة لكل نسخة متماثلة. سيؤدي وضع الحلقة داخل tf.function إلى تعزيز الأداء. ومع ذلك ، فإن break return غير مدعومين حاليًا لحلقة عبر tf.distribute.DistributedDataset الموضوعة داخل tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

استخدم iter لإنشاء مكرر صريح

للتكرار على العناصر في مثيل tf.distribute.DistributedDataset ، يمكنك إنشاء tf.distribute.DistributedIterator باستخدام واجهة برمجة التطبيقات iter عليه. باستخدام مكرر صريح ، يمكنك التكرار لعدد ثابت من الخطوات. من أجل الحصول على العنصر التالي من tf.distribute.DistributedIterator مثيل dist_iterator ، يمكنك استدعاء next(dist_iterator) أو dist_iterator.get_next() أو dist_iterator.get_next_as_optional() . السابقان هما في الأساس نفس الشيء:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

مع next() أو tf.distribute.DistributedIterator.get_next() ، إذا وصل tf.distribute.DistributedIterator إلى نهايته ، فسيتم طرح خطأ OutOfRange. يمكن للعميل اكتشاف الخطأ من جانب Python ومواصلة القيام بأعمال أخرى مثل نقاط التفتيش والتقييم. ومع ذلك ، لن ينجح هذا إذا كنت تستخدم حلقة تدريب مضيف (على سبيل المثال ، قم بتشغيل خطوات متعددة لكل tf.function ) ، والتي تبدو كما يلي:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

يحتوي train_fn على خطوات متعددة عن طريق لف جسم الخطوة داخل نطاق tf.range . في هذه الحالة ، يمكن أن تبدأ التكرارات المختلفة في الحلقة بدون تبعية بالتوازي ، لذلك يمكن تشغيل خطأ OutOfRange في التكرارات اللاحقة قبل انتهاء حساب التكرارات السابقة. بمجرد إلقاء خطأ OutOfRange ، سيتم إنهاء جميع العمليات في الوظيفة على الفور. إذا كانت هذه بعض الحالات التي ترغب في تجنبها ، فإن البديل الذي لا ينتج عنه خطأ OutOfRange هو tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional بإرجاع tf.experimental.Optional الذي يحتوي على العنصر التالي أو لا يحتوي على قيمة إذا وصل tf.distribute.DistributedIterator إلى نهايته.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

استخدام خاصية element_spec

إذا قمت بتمرير عناصر مجموعة بيانات موزعة إلى tf.function وتريد ضمان tf.TypeSpec ، فيمكنك تحديد وسيطة input_signature الخاصة tf.function . ناتج مجموعة البيانات الموزعة هو tf.distribute.DistributedValues التي يمكن أن تمثل الإدخال إلى جهاز واحد أو أجهزة متعددة. للحصول على tf.TypeSpec المطابق لهذه القيمة الموزعة ، يمكنك استخدام خاصية element_spec لمجموعة البيانات الموزعة أو كائن المكرر الموزع.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

دفعات جزئية

تتم مصادفة الدُفعات الجزئية عندما قد تحتوي مثيلات tf.data.Dataset التي ينشئها المستخدمون على أحجام دُفعات لا تقبل القسمة بالتساوي على عدد النسخ المتماثلة أو عندما لا تكون العلاقة الأساسية لمثيل مجموعة البيانات قابلة للقسمة على حجم الدُفعة. هذا يعني أنه عند توزيع مجموعة البيانات على عدة نسخ متماثلة ، فإن الاستدعاء next على بعض التكرارات سينتج عنه OutOfRangeError. للتعامل مع حالة الاستخدام هذه ، tf.distribute بإرجاع دفعات وهمية بحجم الدُفعة 0 على النسخ المتماثلة التي لا تحتوي على أي بيانات أخرى للمعالجة.

بالنسبة لحالة العامل المنفردة ، إذا لم يتم إرجاع البيانات بواسطة الاستدعاء next في المكرر ، يتم إنشاء دفعات وهمية بحجم 0 دفعة واستخدامها مع البيانات الحقيقية في مجموعة البيانات. في حالة الدُفعات الجزئية ، ستحتوي آخر دفعة عالمية من البيانات على بيانات حقيقية جنبًا إلى جنب مع مجموعات وهمية من البيانات. يتحقق شرط الإيقاف لمعالجة البيانات الآن مما إذا كان أي من النسخ المتماثلة يحتوي على بيانات. إذا لم تكن هناك بيانات في أي من النسخ المتماثلة ، فسيتم طرح خطأ OutOfRange.

بالنسبة للحالة متعددة العمال ، يتم تجميع القيمة المنطقية التي تمثل وجود البيانات على كل عامل باستخدام اتصال متماثل متقاطع ويستخدم هذا لتحديد ما إذا كان جميع العمال قد انتهوا من معالجة مجموعة البيانات الموزعة. نظرًا لأن هذا ينطوي على اتصال عبر العمال ، فهناك بعض عقوبات الأداء.

تحفظات

  • عند استخدام tf.distribute.Strategy.experimental_distribute_dataset APIs مع إعداد عامل متعدد ، يمرر المستخدمون tf.data.Dataset الذي يقرأ من الملفات. إذا تم تعيين tf.data.experimental.AutoShardPolicy على AUTO أو FILE ، فقد يكون حجم الدُفعة الفعلي لكل خطوة أصغر من حجم الدُفعة العام الذي يحدده المستخدم. يمكن أن يحدث هذا عندما تكون العناصر المتبقية في الملف أقل من حجم الدُفعة العام. يمكن للمستخدمين إما استنفاد مجموعة البيانات دون الاعتماد على عدد خطوات التشغيل أو تعيين tf.data.experimental.AutoShardPolicy إلى DATA للتغلب عليها.

  • تحويلات مجموعة البيانات ذات الحالة غير مدعومة حاليًا مع tf.distribute وأي عمليات ذات حالة قد تكون مجموعة البيانات قد تم تجاهلها حاليًا. على سبيل المثال ، إذا كانت مجموعة البيانات الخاصة بك تحتوي على map_fn يستخدم tf.random.uniform لتدوير صورة ، فلديك رسم بياني لمجموعة بيانات يعتمد على الحالة (أي البذور العشوائية) على الجهاز المحلي حيث يتم تنفيذ عملية Python.

  • يمكن أن تتسبب الخيارات التجريبية tf.data.experimental.OptimizationOptions التي يتم تعطيلها افتراضيًا في سياقات معينة - مثل عند استخدامها مع tf.distribute - في تدهور الأداء. يجب عليك تمكينها فقط بعد التحقق من أنها تستفيد من أداء عبء العمل الخاص بك في إعداد التوزيع.

  • يرجى الرجوع إلى هذا الدليل لمعرفة كيفية تحسين خط أنابيب الإدخال الخاص بك باستخدام tf.data بشكل عام. بعض النصائح الإضافية:

    • إذا كان لديك العديد من العمال وكنت تستخدم tf.data.Dataset.list_files لإنشاء مجموعة بيانات من جميع الملفات التي تطابق واحدًا أو أكثر من أنماط الكرة الأرضية ، فتذكر تعيين الوسيطة seed أو ضبط shuffle=False بحيث يقوم كل عامل بتقسيم الملف باستمرار.

    • إذا كان خط أنابيب الإدخال الخاص بك يتضمن كلاً من خلط البيانات على مستوى السجل وتحليل البيانات ، ما لم تكن البيانات غير المحللة أكبر بكثير من البيانات التي تم تحليلها (وهي ليست الحالة عادةً) ، فتبادل أولاً ثم التحليل ، كما هو موضح في المثال التالي. قد يفيد هذا استخدام الذاكرة والأداء.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) على مخزن مؤقت داخلي لعناصر buffer_size المخزن المؤقت ، وبالتالي تقليل buffer_size المخزن المؤقت يمكن أن يخفف من مشكلة OOM.

  • الترتيب الذي تتم معالجة البيانات به بواسطة العمال عند استخدام tf.distribute.experimental_distribute_dataset أو tf.distribute.distribute_datasets_from_function غير مضمون. عادة ما يكون هذا مطلوبًا إذا كنت تستخدم tf.distribute لتوقع المقياس. ومع ذلك ، يمكنك إدراج فهرس لكل عنصر في الدُفعة وطلب المخرجات وفقًا لذلك. المقتطف التالي هو مثال على كيفية طلب المخرجات.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

كيف أقوم بتوزيع بياناتي إذا كنت لا أستخدم مثيل tf.data.Dataset الكنسي؟

في بعض الأحيان لا يمكن للمستخدمين استخدام tf.data.Dataset لتمثيل مدخلاتهم وبالتالي واجهات برمجة التطبيقات المذكورة أعلاه لتوزيع مجموعة البيانات على أجهزة متعددة. في مثل هذه الحالات ، يمكنك استخدام موتر خام أو مدخلات من المولد.

استخدم دالة_توزيع_القيمة_التجريبية للدالة_الخاصة_لإدخالات الموتر التعسفية

يقبل tf.distribute.DistributedValues strategy.run ناتج next(iterator) . لتمرير قيم الموتر ، استخدم experimental_distribute_values_from_function لإنشاء tf.distribute.DistributedValues من الموترات الأولية.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

استخدم tf.data.Dataset.from_generator إذا كان الإدخال الخاص بك من مولد

إذا كانت لديك وظيفة منشئ تريد استخدامها ، فيمكنك إنشاء مثيل tf.data.Dataset باستخدام from_generator API.

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.