مشاهده در TensorFlow.org | در Google Colab اجرا شود | مشاهده منبع در GitHub | دانلود دفترچه یادداشت |
API های tf.distribute راه آسانی را برای کاربران فراهم می کند تا آموزش خود را از یک ماشین به چندین ماشین مقیاس کنند. هنگام مقیاس بندی مدل خود، کاربران همچنین باید ورودی خود را در چندین دستگاه توزیع کنند. tf.distribute
API هایی را ارائه می دهد که با استفاده از آنها می توانید ورودی خود را به طور خودکار در بین دستگاه ها توزیع کنید.
این راهنما راههای مختلفی را به شما نشان میدهد که از طریق آن میتوانید مجموعه دادهها و تکرارکنندههای توزیع شده را با استفاده از tf.distribute
API ایجاد کنید. علاوه بر این، موضوعات زیر پوشش داده خواهد شد:
- گزینههای استفاده، اشتراکگذاری و دستهبندی هنگام استفاده از
tf.distribute.Strategy.experimental_distribute_dataset
وtf.distribute.Strategy.distribute_datasets_from_function
. - روشهای مختلفی که در آن میتوانید روی مجموعه داده توزیعشده تکرار کنید.
- تفاوتهای بین
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
و APIهایtf.data
و همچنین محدودیتهایی که کاربران ممکن است در استفاده از آنها با آن مواجه شوند.
این راهنما استفاده از ورودی توزیع شده با Keras API را پوشش نمی دهد.
مجموعه داده های توزیع شده
برای استفاده از tf.distribute
APIها در مقیاس، توصیه می شود که کاربران از tf.data.Dataset
برای نمایش ورودی خود استفاده کنند. tf.distribute
به گونه ای ساخته شده است که با tf.data.Dataset
(به عنوان مثال، واکشی اولیه خودکار داده ها روی هر دستگاه شتاب دهنده) با بهینه سازی عملکرد به طور منظم در پیاده سازی کار کند. اگر مورد استفاده برای استفاده از چیزی غیر از tf.data.Dataset
دارید، لطفاً به بخش بعدی در این راهنما مراجعه کنید. در یک حلقه آموزشی غیر توزیع شده، کاربران ابتدا یک نمونه tf.data.Dataset
ایجاد می کنند و سپس روی عناصر تکرار می کنند. مثلا:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
برای اینکه کاربران بتوانند از استراتژی tf.distribute
با حداقل تغییرات در کد موجود کاربر استفاده کنند، دو API معرفی شدند که یک نمونه tf.data.Dataset
را توزیع کرده و یک شیء مجموعه داده توزیع شده را برمی گرداند. سپس یک کاربر می تواند روی این نمونه داده توزیع شده تکرار کند و مدل خود را مانند قبل آموزش دهد. اجازه دهید اکنون به دو API - tf.distribute.Strategy.experimental_distribute_dataset
و tf.distribute.Strategy.distribute_datasets_from_function
با جزئیات بیشتر نگاه کنیم:
tf.distribute.Strategy.experimental_distribute_dataset
استفاده
این API یک نمونه tf.data.Dataset
را به عنوان ورودی می گیرد و یک نمونه tf.distribute.DistributedDataset
را برمی گرداند. باید مجموعه داده ورودی را با مقداری برابر با اندازه دسته کلی دسته بندی کنید. این اندازه دسته کلی تعداد نمونههایی است که میخواهید در همه دستگاهها در یک مرحله پردازش کنید. می توانید روی این مجموعه داده توزیع شده به روش پایتونیک تکرار کنید یا با استفاده از iter
یک تکرار کننده ایجاد کنید. شیء برگشتی یک نمونه tf.data.Dataset
نیست و از هیچ API دیگری که به هیچ وجه مجموعه داده را تبدیل یا بازرسی می کند، پشتیبانی نمی کند. اگر راههای خاصی ندارید که بخواهید ورودی خود را روی کپیهای مختلف تقسیم کنید، این API توصیه میشود.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
خواص
دسته بندی
tf.distribute
نمونه ورودی tf.data.Dataset
را با یک اندازه دسته جدید که برابر است با اندازه دسته کلی تقسیم بر تعداد کپی های همگام، مجدداً جمع می کند. تعداد کپیهای همگامسازی شده برابر است با تعداد دستگاههایی که در گرادیان شرکت میکنند و در طول آموزش کاهش مییابند. هنگامی که کاربر با تکرار کننده توزیع شده تماس next
را می گیرد، اندازه دسته ای از داده ها به ازای هر ماکت بر روی هر ماکت برگردانده می شود. کاردینالیته مجموعه داده مجدداً چند برابری از تعداد تکرارها خواهد بود. در اینجا چند نمونه آورده شده است:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- بدون توزیع:
- دسته 1: [0، 1، 2، 3]
- دسته 2: [4، 5]
با توزیع بیش از 2 ماکت. آخرین دسته ([4، 5]) بین 2 کپی تقسیم می شود.
دسته 1:
- Replica 1:[0, 1]
- Replica 2:[2, 3]
دسته 2:
- ماکت 2: [4]
- ماکت 2: [5]
tf.data.Dataset.range(4).batch(4)
- بدون توزیع:
- دسته 1: [[0]، [1]، [2]، [3]]
- با توزیع بیش از 5 کپی:
- دسته 1:
- ماکت 1: [0]
- ماکت 2: [1]
- ماکت 3: [2]
- ماکت 4: [3]
- ماکت 5: []
tf.data.Dataset.range(8).batch(4)
- بدون توزیع:
- دسته 1: [0، 1، 2، 3]
- دسته 2: [4، 5، 6، 7]
- با توزیع بیش از 3 کپی:
- دسته 1:
- ماکت 1: [0، 1]
- ماکت 2: [2، 3]
- ماکت 3: []
- دسته 2:
- ماکت 1: [4، 5]
- ماکت 2: [6، 7]
- ماکت 3: []
بازگردانی مجموعه داده دارای پیچیدگی فضایی است که به صورت خطی با تعداد کپی ها افزایش می یابد. این بدان معنی است که برای استفاده از آموزش چند کارگری، خط لوله ورودی می تواند با خطاهای OOM مواجه شود.
شاردینگ
tf.distribute
همچنین مجموعه داده ورودی را در آموزش چند کارگری با MultiWorkerMirroredStrategy
و TPUStrategy
می کند. هر مجموعه داده بر روی دستگاه CPU کارگر ایجاد می شود. اشتراک گذاری خودکار یک مجموعه داده روی مجموعه ای از کارگران به این معنی است که به هر کارگر زیر مجموعه ای از کل مجموعه داده اختصاص داده می شود (اگر tf.data.experimental.AutoShardPolicy
سمت راست تنظیم شده باشد). این برای اطمینان از این است که در هر مرحله، یک اندازه دسته ای جهانی از عناصر داده غیرهمپوشانی توسط هر کارگر پردازش می شود. Autosharding چند گزینه مختلف دارد که می توان آنها را با استفاده از tf.data.experimental.DistributeOptions
مشخص کرد. توجه داشته باشید که در آموزش چند کارگری با ParameterServerStrategy
، هیچ autosharding وجود ندارد و اطلاعات بیشتر در مورد ایجاد مجموعه داده با این استراتژی را میتوانید در آموزش استراتژی Parameter Server بیابید .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
سه گزینه مختلف وجود دارد که می توانید برای tf.data.experimental.AutoShardPolicy
تنظیم کنید:
- AUTO: این گزینه پیش فرض است که به این معنی است که سعی می شود توسط FILE به اشتراک گذاشته شود. اگر یک مجموعه داده مبتنی بر فایل شناسایی نشود، تلاش برای خرد کردن توسط FILE با شکست مواجه میشود. سپس
tf.distribute
به اشتراک گذاری توسط DATA برمی گردد. توجه داشته باشید که اگر مجموعه داده ورودی مبتنی بر فایل باشد اما تعداد فایلها کمتر از تعداد کارگران باشد، یکInvalidArgumentError
. اگر این اتفاق افتاد، صراحتاً خطمشی را رویAutoShardPolicy.DATA
تنظیم کنید، یا منبع ورودی خود را به فایلهای کوچکتر تقسیم کنید تا تعداد فایلها از تعداد کارگران بیشتر باشد. FILE: این گزینه ای است که می خواهید فایل های ورودی را روی همه کارگران خرد کنید. اگر تعداد فایل های ورودی بسیار بیشتر از تعداد کارگران است و داده های موجود در فایل ها به طور مساوی توزیع شده اند، باید از این گزینه استفاده کنید. نقطه ضعف این گزینه وجود کارگران بیکار است در صورتی که داده ها در فایل ها به طور مساوی توزیع نشده باشند. اگر تعداد فایل ها کمتر از تعداد کارگران باشد، یک
InvalidArgumentError
مطرح می شود. اگر این اتفاق افتاد، صراحتاً خطمشی را رویAutoShardPolicy.DATA
تنظیم کنید. به عنوان مثال، اجازه دهید 2 فایل را روی 2 کارگر با 1 ماکت توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5] و فایل 2 حاوی [6، 7، 8، 9، 10، 11] است. اجازه دهید تعداد کل کپیها به صورت همگام 2 و اندازه دسته جهانی 4 باشد.- کارگر 0:
- دسته 1 = ماکت 1: [0، 1]
- دسته 2 = ماکت 1: [2، 3]
- دسته 3 = ماکت 1: [4]
- دسته 4 = ماکت 1: [5]
- کارگر 1:
- دسته 1 = ماکت 2: [6، 7]
- دسته 2 = ماکت 2: [8، 9]
- دسته 3 = ماکت 2: [10]
- دسته 4 = ماکت 2: [11]
DATA: این کار عناصر را در همه کارگران به صورت خودکار خرد می کند. هر یک از کارگران کل مجموعه داده را می خوانند و فقط قطعه اختصاص داده شده به آن را پردازش می کنند. تمام خرده های دیگر دور ریخته خواهند شد. این معمولاً در صورتی استفاده میشود که تعداد فایلهای ورودی کمتر از تعداد کارگران باشد و بخواهید اشتراکگذاری بهتری از دادهها در همه کارگران داشته باشید. نکته منفی این است که کل مجموعه داده در هر کارگر خوانده می شود. به عنوان مثال، اجازه دهید 1 فایل را روی 2 کارگر توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5، 6، 7، 8، 9، 10، 11] است. تعداد کل کپی های همگام شده را 2 عدد بگذارید.
- کارگر 0:
- دسته 1 = ماکت 1: [0، 1]
- دسته 2 = ماکت 1: [4، 5]
- دسته 3 = ماکت 1: [8، 9]
- کارگر 1:
- دسته 1 = ماکت 2: [2، 3]
- دسته 2 = ماکت 2: [6، 7]
- دسته 3 = ماکت 2: [10، 11]
OFF: اگر Autosharding را خاموش کنید، هر کارگر تمام داده ها را پردازش می کند. به عنوان مثال، اجازه دهید 1 فایل را روی 2 کارگر توزیع کنیم. فایل 1 شامل [0، 1، 2، 3، 4، 5، 6، 7، 8، 9، 10، 11] است. بگذارید تعداد کل کپیهای همگامسازی شده 2 باشد. سپس هر کارگر توزیع زیر را میبیند:
- کارگر 0:
- دسته 1 = ماکت 1: [0، 1]
- دسته 2 = ماکت 1: [2، 3]
- دسته 3 = ماکت 1: [4، 5]
- دسته 4 = ماکت 1: [6، 7]
- دسته 5 = ماکت 1: [8، 9]
دسته 6 = ماکت 1: [10، 11]
کارگر 1:
دسته 1 = ماکت 2: [0، 1]
دسته 2 = ماکت 2: [2، 3]
دسته 3 = ماکت 2: [4، 5]
دسته 4 = ماکت 2: [6، 7]
دسته 5 = ماکت 2: [8، 9]
دسته 6 = ماکت 2: [10، 11]
پیش واکشی
بهطور پیشفرض، tf.distribute
یک تبدیل واکشی پیشفرض در انتهای نمونه tf.data.Dataset
ارائهشده توسط کاربر اضافه میکند. آرگومان تبدیل پیش واکشی که buffer_size
است برابر با تعداد تکرارهای همگام است.
tf.distribute.Strategy.distribute_datasets_from_function
استفاده
این API یک تابع ورودی می گیرد و یک نمونه tf.distribute.DistributedDataset
را برمی گرداند. تابع ورودی که کاربران در آن ارسال می کنند دارای آرگومان tf.distribute.InputContext
است و باید یک نمونه tf.data.Dataset
را برگرداند. با این API، tf.distribute
هیچ تغییر دیگری در نمونه tf.data.Dataset
کاربر که از تابع ورودی برگردانده شده است، ایجاد نمی کند. مسئولیت دسته بندی و خرد کردن مجموعه داده ها بر عهده کاربر است. tf.distribute
تابع ورودی را در دستگاه CPU هر یک از کارگران فراخوانی می کند. جدا از اینکه به کاربران اجازه میدهد منطق دستهبندی و اشتراکگذاری خود را مشخص کنند، این API مقیاسپذیری و عملکرد بهتری را در مقایسه با tf.distribute.Strategy.experimental_distribute_dataset
که برای آموزش چند کارگری استفاده میشود.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
خواص
دسته بندی
نمونه tf.data.Dataset
که مقدار بازگشتی تابع ورودی است، باید با استفاده از اندازه دستهای هر تکرار دستهبندی شود. اندازه دستهای هر ماکت، اندازه دسته جهانی تقسیم بر تعداد کپیهایی است که در آموزش همگامسازی شرکت میکنند. این به این دلیل است که tf.distribute
تابع ورودی را در دستگاه CPU هر یک از کارگران فراخوانی می کند. مجموعه داده ای که روی یک کارگر معین ایجاد می شود باید برای استفاده توسط همه کپی های آن کارگر آماده باشد.
شاردینگ
شی tf.distribute.InputContext
که به طور ضمنی به عنوان آرگومان به تابع ورودی کاربر ارسال می شود توسط tf.distribute
در زیر هود ایجاد می شود. این تابع اطلاعاتی در مورد تعداد کارگران، شناسه کارگر فعلی و غیره دارد. این تابع ورودی میتواند طبق خطمشیهایی که کاربر با استفاده از این ویژگیها که بخشی از شی tf.distribute.InputContext
هستند، تقسیمبندی را انجام دهد.
پیش واکشی
tf.distribute
یک تبدیل پیش واکشی در انتهای tf.data.Dataset
که توسط تابع ورودی ارائه شده توسط کاربر برگردانده شده است، اضافه نمی کند.
تکرار کننده های توزیع شده
مشابه نمونههای غیر توزیعشده tf.data.Dataset
، باید یک تکرارکننده در نمونههای tf.distribute.DistributedDataset
ایجاد کنید تا روی آن تکرار شود و به عناصر موجود در tf.distribute.DistributedDataset
دسترسی داشته باشید. در زیر روش هایی وجود دارد که می توانید یک tf.distribute.DistributedIterator
ایجاد کنید و از آن برای آموزش مدل خود استفاده کنید:
موارد استفاده
از ساختار حلقه پایتونیک برای حلقه استفاده کنید
می توانید از یک حلقه پایتونیک کاربر پسند برای تکرار روی tf.distribute.DistributedDataset
استفاده کنید. عناصر برگردانده شده از tf.distribute.DistributedIterator
می توانند یک tf.Tensor
یا یک tf.distribute.DistributedValues
که حاوی یک مقدار برای هر ماکت است. قرار دادن حلقه در داخل یک tf.function
باعث افزایش عملکرد می شود. با این حال، break
و return
در حال حاضر برای یک حلقه روی tf.distribute.DistributedDataset
که در داخل یک tf.function
قرار می گیرد، پشتیبانی نمی شود.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
از iter
برای ایجاد یک تکرار کننده صریح استفاده کنید
برای تکرار روی عناصر در یک نمونه tf.distribute.DistributedDataset
، می توانید یک tf.distribute.DistributedIterator
با استفاده از iter
API روی آن ایجاد کنید. با یک تکرار کننده صریح، می توانید برای تعداد ثابتی از مراحل را تکرار کنید. برای دریافت عنصر بعدی از یک dist_iterator
tf.distribute.DistributedIterator
می توانید next(dist_iterator)
، dist_iterator.get_next()
یا dist_iterator.get_next_as_optional()
کنید. دو مورد قبلی در اصل یکسان هستند:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
با next()
یا tf.distribute.DistributedIterator.get_next()
، اگر tf.distribute.DistributedIterator
به پایان خود رسیده باشد، یک خطای OutOfRange پرتاب می شود. مشتری می تواند خطا را در سمت پایتون تشخیص دهد و به انجام کارهای دیگر مانند چک پوینت و ارزیابی ادامه دهد. با این حال، اگر از یک حلقه آموزشی میزبان استفاده می کنید (یعنی چندین مرحله را در هر tf.function
)، که به نظر می رسد، این کار کار نخواهد کرد:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
شامل چندین مرحله با پیچاندن بدنه استپ در یک tf.range
است. در این حالت، تکرارهای مختلف در حلقه بدون وابستگی میتوانند به صورت موازی شروع شوند، بنابراین یک خطای OutOfRange میتواند در تکرارهای بعدی قبل از اتمام محاسبه تکرارهای قبلی ایجاد شود. هنگامی که یک خطای OutOfRange پرتاب می شود، تمام عملیات در تابع بلافاصله خاتمه می یابد. اگر این موردی است که میخواهید از آن اجتناب کنید، جایگزینی که خطای OutOfRange ایجاد نمیکند، tf.distribute.DistributedIterator.get_next_as_optional()
است. get_next_as_optional
یک tf.experimental.Optional
را برمیگرداند که اگر tf.distribute.DistributedIterator
به پایان رسیده باشد، عنصر بعدی یا هیچ مقداری را ندارد.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
با استفاده از ویژگی element_spec
اگر عناصر یک مجموعه داده توزیع شده را به یک tf.function
و یک ضمانت tf.TypeSpec
می خواهید، می توانید آرگومان input_signature
tf.function
مشخص کنید. خروجی یک مجموعه داده توزیع شده tf.distribute.DistributedValues
است که می تواند ورودی یک دستگاه یا چندین دستگاه را نشان دهد. برای بدست آوردن tf.TypeSpec
مربوط به این مقدار توزیع شده، می توانید از ویژگی element_spec
مجموعه داده توزیع شده یا شی تکرار کننده توزیع شده استفاده کنید.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
دسته های جزئی
هنگامی که نمونههای tf.data.Dataset
که کاربران ایجاد میکنند ممکن است شامل اندازههای دستهای باشد که به طور مساوی بر تعداد کپیها تقسیم نمیشوند یا زمانی که اصلی بودن نمونه مجموعه داده بر اندازه دستهای تقسیمپذیر نباشد، با دستههای جزئی مواجه میشویم. این بدان معناست که وقتی مجموعه داده بر روی چندین نسخه توزیع میشود، فراخوانی next
در برخی تکرارکنندهها منجر به خطای OutOfRange میشود. برای رسیدگی به این مورد استفاده، tf.distribute
دستههای ساختگی با اندازه دستهای 0 را روی کپیهایی که داده دیگری برای پردازش ندارند، برمیگرداند.
برای مورد تک کارگر، اگر دادهها توسط تماس next
در تکرارکننده بازگردانده نشود، دستههای ساختگی با اندازه دستهای 0 ایجاد میشوند و همراه با دادههای واقعی در مجموعه داده استفاده میشوند. در مورد دستههای جزئی، آخرین دسته جهانی دادهها شامل دادههای واقعی در کنار دستههای ساختگی دادهها خواهد بود. شرایط توقف برای پردازش داده ها اکنون بررسی می کند که آیا هر یک از ماکت ها داده ای دارند یا خیر. اگر هیچ داده ای در مورد هیچ یک از کپی ها وجود نداشته باشد، یک خطای OutOfRange پرتاب می شود.
برای مورد چند کارگری، مقدار بولی که نشاندهنده حضور دادهها در هر یک از کارگران است با استفاده از ارتباط متقابل تکراری جمعآوری میشود و برای شناسایی اینکه آیا همه کارگران پردازش مجموعه داده توزیعشده را به پایان رساندهاند، استفاده میشود. از آنجایی که این شامل ارتباط متقابل کارگری است، جریمه عملکردی نیز در بر دارد.
هشدارها
هنگام استفاده از APIهای
tf.distribute.Strategy.experimental_distribute_dataset
با راهاندازی چند کارگر، کاربران یکtf.data.Dataset
را ارسال میکنند که از فایلها میخواند. اگرtf.data.experimental.AutoShardPolicy
رویAUTO
یاFILE
تنظیم شده باشد، اندازه دسته واقعی در هر مرحله ممکن است کوچکتر از اندازه دسته کلی تعریف شده توسط کاربر باشد. این می تواند زمانی اتفاق بیفتد که عناصر باقی مانده در فایل کمتر از اندازه دسته ای جهانی باشند. کاربران میتوانند مجموعه داده را بدون بسته به تعداد مراحل اجرا، خسته کنند یاtf.data.experimental.AutoShardPolicy
را رویDATA
تنظیم کنند تا دور آن کار کنند.تبدیلهای مجموعه داده حالتی در حال حاضر با
tf.distribute
پشتیبانی نمیشوند و هرگونه عملیات حالتی که مجموعه داده ممکن است داشته باشد در حال حاضر نادیده گرفته میشود. به عنوان مثال، اگر مجموعه داده شما دارای یکmap_fn
است که ازtf.random.uniform
برای چرخاندن یک تصویر استفاده می کند، آنگاه یک نمودار مجموعه داده دارید که به حالت (یعنی دانه تصادفی) در ماشین محلی که در آن فرآیند پایتون در حال اجرا است بستگی دارد.Experimental
tf.data.experimental.OptimizationOptions
هایی که به طور پیش فرض غیرفعال می شوند می توانند در زمینه های خاصی -- مانند زمانی که همراه باtf.distribute
استفاده می شوند -- باعث کاهش عملکرد شوند. شما باید آنها را فقط پس از تأیید اعتبار آنها فعال کنید که عملکرد حجم کاری شما در یک تنظیم توزیع مفید است.لطفاً برای نحوه بهینه سازی خط لوله ورودی خود با
tf.data
به طور کلی به این راهنما مراجعه کنید. چند نکته اضافی:اگر چندین کارگر دارید و از
tf.data.Dataset.list_files
برای ایجاد یک مجموعه داده از همه فایلهای منطبق با یک یا چند الگوی glob استفاده میکنید، به یاد داشته باشید که آرگومانseed
را تنظیم کنید یاshuffle=False
را طوری تنظیم کنید که هر کارگر فایل را به طور پیوسته خرد کند.اگر خط لوله ورودی شما شامل مخلوط کردن داده ها در سطح رکورد و تجزیه داده ها می شود، مگر اینکه داده های تجزیه نشده به طور قابل توجهی بزرگتر از داده های تجزیه شده باشد (که معمولاً اینطور نیست)، ابتدا مخلوط کنید و سپس تجزیه کنید، همانطور که در مثال زیر نشان داده شده است. این ممکن است برای استفاده و عملکرد حافظه مفید باشد.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
یک بافر داخلی از عناصرbuffer_size
را حفظ می کند، و بنابراین کاهشbuffer_size
می تواند مشکل OOM را کاهش دهد.ترتیب پردازش داده ها توسط کارگران هنگام استفاده از
tf.distribute.experimental_distribute_dataset
یاtf.distribute.distribute_datasets_from_function
تضمین نمی شود. اگر ازtf.distribute
برای پیشبینی مقیاس استفاده میکنید، معمولاً این مورد ضروری است. با این حال، می توانید برای هر عنصر در دسته یک شاخص درج کنید و خروجی ها را بر اساس آن سفارش دهید. قطعه زیر نمونه ای از نحوه سفارش خروجی ها است.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
اگر از نمونه canonical tf.data.Dataset استفاده نمی کنم، چگونه داده های خود را توزیع کنم؟
گاهی اوقات کاربران نمی توانند از tf.data.Dataset
برای نمایش ورودی خود و متعاقباً از APIهای ذکر شده در بالا برای توزیع مجموعه داده در چندین دستگاه استفاده کنند. در چنین مواردی می توانید از تانسورهای خام یا ورودی های یک ژنراتور استفاده کنید.
برای ورودی های تانسور دلخواه، از functional_distribute_values_from_function استفاده کنید
strategy.run
tf.distribute.DistributedValues
را می پذیرد که خروجی next(iterator)
است. برای ارسال مقادیر تانسور، از experimental_distribute_values_from_function
برای ساخت tf.distribute.DistributedValues
از تانسورهای خام استفاده کنید.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
اگر ورودی شما از یک ژنراتور است، از tf.data.Dataset.from_generator استفاده کنید
اگر یک تابع مولد دارید که می خواهید استفاده کنید، می توانید یک نمونه tf.data.Dataset
با استفاده از from_generator
API ایجاد کنید.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.