TensorFlow.org पर देखें | Google Colab में चलाएं | GitHub पर स्रोत देखें | नोटबुक डाउनलोड करें |
tf.distribute API उपयोगकर्ताओं को एक मशीन से कई मशीनों तक अपने प्रशिक्षण को स्केल करने का एक आसान तरीका प्रदान करता है। अपने मॉडल को स्केल करते समय, उपयोगकर्ताओं को अपने इनपुट को कई उपकरणों में वितरित करना होता है। tf.distribute
एपीआई प्रदान करता है जिसके उपयोग से आप अपने इनपुट को स्वचालित रूप से सभी उपकरणों में वितरित कर सकते हैं।
यह मार्गदर्शिका आपको tf.distribute
APIs का उपयोग करके वितरित डेटासेट और इटरेटर बनाने के विभिन्न तरीकों को दिखाएगी। इसके अतिरिक्त, निम्नलिखित विषयों को कवर किया जाएगा:
- tf.distribute.Strategy.experimental_distribute_dataset और
tf.distribute.Strategy.distribute_datasets_from_function
का उपयोग करते समय उपयोग,tf.distribute.Strategy.experimental_distribute_dataset
और बैचिंग विकल्प। - विभिन्न तरीकों से आप वितरित डेटासेट पर पुनरावृति कर सकते हैं।
-
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
APIs औरtf.data
APIs के बीच अंतर और साथ ही ऐसी कोई भी सीमाएं जो उपयोगकर्ताओं को उनके उपयोग में आ सकती हैं।
यह मार्गदर्शिका केरस एपीआई के साथ वितरित इनपुट के उपयोग को कवर नहीं करती है।
वितरित डेटासेट
बड़े पैमाने पर tf.वितरित API का उपयोग करने के लिए, यह अनुशंसा की जाती है कि उपयोगकर्ता अपने इनपुट का प्रतिनिधित्व करने के लिए tf.distribute
का उपयोग tf.data.Dataset
। tf.distribute
के साथ कुशलतापूर्वक कार्य करने के लिए tf.data.Dataset
.वितरण किया गया है। यदि आपके पास tf.data.Dataset
के अलावा किसी अन्य चीज़ का उपयोग करने के लिए उपयोग का मामला है, तो कृपया इस गाइड में बाद का अनुभाग देखें। एक गैर-वितरित प्रशिक्षण लूप में, उपयोगकर्ता पहले एक tf.data.Dataset
उदाहरण बनाते हैं और फिर तत्वों पर पुनरावृति करते हैं। उदाहरण के लिए:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
उपयोगकर्ताओं को उपयोगकर्ता के मौजूदा कोड में न्यूनतम परिवर्तनों के साथ tf.distribute
रणनीति का उपयोग करने की अनुमति देने के लिए, दो एपीआई पेश किए गए थे जो एक tf.data.Dataset
उदाहरण वितरित करेंगे और एक वितरित डेटासेट ऑब्जेक्ट लौटाएंगे। एक उपयोगकर्ता तब इस वितरित डेटासेट उदाहरण पर पुनरावृति कर सकता है और अपने मॉडल को पहले की तरह प्रशिक्षित कर सकता है। आइए अब हम दो एपीआई - tf.distribute.Strategy.experimental_distribute_dataset
और tf.distribute.Strategy.distribute_datasets_from_function
को अधिक विस्तार से देखें:
tf.distribute.Strategy.experimental_distribute_dataset
प्रयोग
यह एपीआई इनपुट के रूप में एक tf.data.Dataset
उदाहरण लेता है और एक tf.distribute.DistributedDataset
उदाहरण देता है। आपको इनपुट डेटासेट को वैश्विक बैच आकार के बराबर मान के साथ बैच करना चाहिए। यह वैश्विक बैच आकार उन नमूनों की संख्या है, जिन्हें आप 1 चरण में सभी उपकरणों पर संसाधित करना चाहते हैं। आप इस वितरित डेटासेट पर पाइथोनिक फैशन में पुनरावृति कर सकते हैं या iter
का उपयोग करके एक पुनरावर्तक बना सकते हैं। लौटाई गई वस्तु एक tf.data.Dataset
उदाहरण नहीं है और किसी भी अन्य API का समर्थन नहीं करती है जो किसी भी तरह से डेटासेट को रूपांतरित या निरीक्षण करता है। यह अनुशंसित एपीआई है यदि आपके पास विशिष्ट तरीके नहीं हैं जिसमें आप विभिन्न प्रतिकृतियों पर अपने इनपुट को शार्प करना चाहते हैं।
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
गुण
बैचिंग
tf.वितरित इनपुट tf.distribute
उदाहरण को एक नए बैच आकार के साथ tf.data.Dataset
करता है जो सिंक में प्रतिकृतियों की संख्या से विभाजित वैश्विक बैच आकार के बराबर है। सिंक में प्रतिकृतियों की संख्या उन उपकरणों की संख्या के बराबर है जो प्रशिक्षण के दौरान ग्रेडिएंट सभी में भाग ले रहे हैं। जब कोई उपयोगकर्ता वितरित इटरेटर पर next
कॉल करता है, तो प्रत्येक प्रतिकृति पर डेटा का प्रति प्रतिकृति बैच आकार वापस कर दिया जाता है। रीबैच्ड डेटासेट कार्डिनैलिटी हमेशा प्रतिकृतियों की संख्या का गुणक होगी। यहां कुछ उदाहरण दिए गए हैं:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- वितरण के बिना:
- बैच 1: [0, 1, 2, 3]
- बैच 2: [4, 5]
2 प्रतिकृतियों पर वितरण के साथ। अंतिम बैच ([4, 5]) 2 प्रतिकृतियों के बीच विभाजित है।
बैच 1:
- प्रतिकृति 1:[0, 1]
- प्रतिकृति 2:[2, 3]
बैच 2:
- प्रतिकृति 2: [4]
- प्रतिकृति 2: [5]
tf.data.Dataset.range(4).batch(4)
- वितरण के बिना:
- बैच 1: [[0], [1], [2], [3]]
- 5 से अधिक प्रतिकृतियों के वितरण के साथ:
- बैच 1:
- प्रतिकृति 1: [0]
- प्रतिकृति 2: [1]
- प्रतिकृति 3: [2]
- प्रतिकृति 4: [3]
- प्रतिकृति 5: []
tf.data.Dataset.range(8).batch(4)
- वितरण के बिना:
- बैच 1: [0, 1, 2, 3]
- बैच 2: [4, 5, 6, 7]
- 3 प्रतिकृतियों पर वितरण के साथ:
- बैच 1:
- प्रतिकृति 1: [0, 1]
- प्रतिकृति 2: [2, 3]
- प्रतिकृति 3: []
- बैच 2:
- प्रतिकृति 1: [4, 5]
- प्रतिकृति 2: [6, 7]
- प्रतिकृति 3: []
डेटासेट को रीबैच करने में एक स्थान जटिलता होती है जो प्रतिकृतियों की संख्या के साथ रैखिक रूप से बढ़ती है। इसका मतलब है कि बहु-कार्यकर्ता प्रशिक्षण उपयोग के मामले में इनपुट पाइपलाइन OOM त्रुटियों में चल सकती है।
शेयरिंग
tf.distribute
MultiWorkerMirroredStrategy
और TPUStrategy
के साथ मल्टी वर्कर ट्रेनिंग में इनपुट डेटासेट को ऑटोशर्ड भी करता है। प्रत्येक डेटासेट कार्यकर्ता के सीपीयू डिवाइस पर बनाया जाता है। श्रमिकों के एक समूह पर एक डेटासेट को ऑटोशर्डिंग करने का अर्थ है कि प्रत्येक कार्यकर्ता को संपूर्ण डेटासेट का एक सबसेट सौंपा गया है (यदि सही tf.data.experimental.AutoShardPolicy
सेट है)। यह सुनिश्चित करने के लिए है कि प्रत्येक चरण में, गैर-अतिव्यापी डेटासेट तत्वों का एक वैश्विक बैच आकार प्रत्येक कार्यकर्ता द्वारा संसाधित किया जाएगा। ऑटोशर्डिंग में कुछ अलग विकल्प होते हैं जिन्हें tf.data.experimental.DistributeOptions
का उपयोग करके निर्दिष्ट किया जा सकता है। ध्यान दें कि ParameterServerStrategy
के साथ बहु-कार्यकर्ता प्रशिक्षण में कोई ऑटोशेयरिंग नहीं है, और इस रणनीति के साथ डेटासेट निर्माण पर अधिक जानकारी पैरामीटर सर्वर रणनीति ट्यूटोरियल में पाई जा सकती है।
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
आप tf.data.experimental.AutoShardPolicy
के लिए तीन अलग-अलग विकल्प सेट कर सकते हैं:
- AUTO: यह डिफ़ॉल्ट विकल्प है जिसका अर्थ है कि FILE द्वारा शार्प करने का प्रयास किया जाएगा। फ़ाइल-आधारित डेटासेट का पता नहीं चलने पर FILE द्वारा शार्प करने का प्रयास विफल हो जाता है।
tf.distribute
फिर DATA द्वारा शार्डिंग पर वापस आ जाएगा। ध्यान दें कि यदि इनपुट डेटासेट फ़ाइल-आधारित है, लेकिन फ़ाइलों की संख्या श्रमिकों की संख्या से कम है, तो एकInvalidArgumentError
उठाया जाएगा। यदि ऐसा होता है, तो नीति को स्पष्ट रूप सेAutoShardPolicy.DATA
पर सेट करें, या अपने इनपुट स्रोत को छोटी फ़ाइलों में विभाजित करें ताकि फाइलों की संख्या श्रमिकों की संख्या से अधिक हो। FILE: यह विकल्प है यदि आप सभी श्रमिकों पर इनपुट फ़ाइलों को शार्प करना चाहते हैं। आपको इस विकल्प का उपयोग करना चाहिए यदि इनपुट फाइलों की संख्या श्रमिकों की संख्या से बहुत अधिक है और फाइलों में डेटा समान रूप से वितरित किया गया है। यदि फाइलों में डेटा समान रूप से वितरित नहीं किया जाता है, तो इस विकल्प का नकारात्मक पक्ष निष्क्रिय श्रमिकों का होना है। यदि फाइलों की संख्या श्रमिकों की संख्या से कम है, तो एक
InvalidArgumentError
हो जाएगी। यदि ऐसा होता है, तो नीति को स्पष्ट रूप सेAutoShardPolicy.DATA
पर सेट करें। उदाहरण के लिए, आइए हम 2 फाइलों को 2 कर्मचारियों पर वितरित करें जिनमें से प्रत्येक में 1 प्रतिकृति है। फ़ाइल 1 में [0, 1, 2, 3, 4, 5] और फ़ाइल 2 में [6, 7, 8, 9, 10, 11] हैं। बता दें कि सिंक में प्रतिकृतियों की कुल संख्या 2 है और वैश्विक बैच का आकार 4 है।- कार्यकर्ता 0:
- बैच 1 = प्रतिकृति 1: [0, 1]
- बैच 2 = प्रतिकृति 1: [2, 3]
- बैच 3 = प्रतिकृति 1: [4]
- बैच 4 = प्रतिकृति 1: [5]
- कार्यकर्ता 1:
- बैच 1 = प्रतिकृति 2: [6, 7]
- बैच 2 = प्रतिकृति 2: [8, 9]
- बैच 3 = प्रतिकृति 2: [10]
- बैच 4 = प्रतिकृति 2: [11]
डेटा: यह सभी कर्मचारियों के तत्वों को ऑटोशर्ड करेगा। प्रत्येक कार्यकर्ता पूरे डेटासेट को पढ़ेगा और केवल उसे सौंपे गए शार्प को प्रोसेस करेगा। अन्य सभी शार्क को त्याग दिया जाएगा। यह आमतौर पर तब उपयोग किया जाता है जब इनपुट फाइलों की संख्या श्रमिकों की संख्या से कम होती है और आप सभी श्रमिकों के डेटा को बेहतर तरीके से साझा करना चाहते हैं। नकारात्मक पक्ष यह है कि प्रत्येक कार्यकर्ता पर संपूर्ण डेटासेट पढ़ा जाएगा। उदाहरण के लिए, आइए हम 2 कर्मचारियों पर 1 फाइल वितरित करें। फ़ाइल 1 में [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] शामिल हैं। बता दें कि सिंक में प्रतिकृतियों की कुल संख्या 2 है।
- कार्यकर्ता 0:
- बैच 1 = प्रतिकृति 1: [0, 1]
- बैच 2 = प्रतिकृति 1: [4, 5]
- बैच 3 = प्रतिकृति 1: [8, 9]
- कार्यकर्ता 1:
- बैच 1 = प्रतिकृति 2: [2, 3]
- बैच 2 = प्रतिकृति 2: [6, 7]
- बैच 3 = प्रतिकृति 2: [10, 11]
बंद: यदि आप ऑटोशेयरिंग को बंद करते हैं, तो प्रत्येक कार्यकर्ता सभी डेटा को संसाधित करेगा। उदाहरण के लिए, आइए हम 2 कर्मचारियों पर 1 फाइल वितरित करें। फ़ाइल 1 में [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] शामिल हैं। सिंक में प्रतिकृतियों की कुल संख्या 2 होने दें। फिर प्रत्येक कार्यकर्ता को निम्नलिखित वितरण दिखाई देगा:
- कार्यकर्ता 0:
- बैच 1 = प्रतिकृति 1: [0, 1]
- बैच 2 = प्रतिकृति 1: [2, 3]
- बैच 3 = प्रतिकृति 1: [4, 5]
- बैच 4 = प्रतिकृति 1: [6, 7]
- बैच 5 = प्रतिकृति 1: [8, 9]
बैच 6 = प्रतिकृति 1: [10, 11]
कार्यकर्ता 1:
बैच 1 = प्रतिकृति 2: [0, 1]
बैच 2 = प्रतिकृति 2: [2, 3]
बैच 3 = प्रतिकृति 2: [4, 5]
बैच 4 = प्रतिकृति 2: [6, 7]
बैच 5 = प्रतिकृति 2: [8, 9]
बैच 6 = प्रतिकृति 2: [10, 11]
प्रीफेचिंग
डिफ़ॉल्ट रूप से, tf.distribute
उपयोगकर्ता द्वारा प्रदान किए गए tf.data.Dataset
उदाहरण के अंत में एक प्रीफ़ेच परिवर्तन जोड़ता है। प्रीफ़ेच ट्रांसफ़ॉर्मेशन का तर्क जो कि buffer_size
है, सिंक में प्रतिकृतियों की संख्या के बराबर है।
tf.distribute.Strategy.distribute_datasets_from_function
प्रयोग
यह एपीआई एक इनपुट फ़ंक्शन लेता है और एक tf.distribute.DistributedDataset
उदाहरण देता है। उपयोगकर्ता द्वारा पास किए जाने वाले इनपुट फ़ंक्शन में tf.distribute.InputContext
तर्क होता है और उसे tf.data.Dataset
उदाहरण वापस करना चाहिए। इस एपीआई के साथ, tf.distribute
उपयोगकर्ता के tf.data.Dataset
में कोई और बदलाव नहीं करता है। इनपुट फ़ंक्शन से लौटाए गए डेटासेट इंस्टेंस। डेटासेट को बैच और शार्प करना उपयोगकर्ता की जिम्मेदारी है। tf.distribute
प्रत्येक कार्यकर्ता के CPU डिवाइस पर इनपुट फ़ंक्शन को कॉल करता है। उपयोगकर्ताओं को अपने स्वयं के बैचिंग और शार्डिंग लॉजिक को निर्दिष्ट करने की अनुमति देने के अलावा, यह एपीआई बहु-कार्यकर्ता प्रशिक्षण के लिए उपयोग किए जाने पर tf.distribute.Strategy.experimental_distribute_dataset
की तुलना में बेहतर मापनीयता और प्रदर्शन भी प्रदर्शित करता है।
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
गुण
बैचिंग
tf.data.Dataset
उदाहरण जो कि इनपुट फ़ंक्शन का रिटर्न मान है, प्रति प्रतिकृति बैच आकार का उपयोग करके बैच किया जाना चाहिए। प्रति प्रतिकृति बैच आकार वैश्विक बैच आकार है जो सिंक प्रशिक्षण में भाग लेने वाले प्रतिकृतियों की संख्या से विभाजित होता है। ऐसा इसलिए है क्योंकि tf.distribute
प्रत्येक कार्यकर्ता के CPU डिवाइस पर इनपुट फ़ंक्शन को कॉल करता है। किसी दिए गए कार्यकर्ता पर बनाया गया डेटासेट उस कार्यकर्ता पर सभी प्रतिकृतियों द्वारा उपयोग के लिए तैयार होना चाहिए।
शेयरिंग
tf.distribute.InputContext
ऑब्जेक्ट जो परोक्ष रूप से उपयोगकर्ता के इनपुट फ़ंक्शन के लिए एक तर्क के रूप में पारित किया जाता है, हुड के तहत tf.distribute
.वितरण द्वारा बनाया जाता है। इसमें श्रमिकों की संख्या, वर्तमान कार्यकर्ता आईडी आदि के बारे में जानकारी है। यह इनपुट फ़ंक्शन इन गुणों का उपयोग करके उपयोगकर्ता द्वारा निर्धारित नीतियों के अनुसार शार्डिंग को संभाल सकता है जो tf.distribute.InputContext
ऑब्जेक्ट का हिस्सा हैं।
प्रीफेचिंग
tf. tf.distribute
के अंत में एक प्रीफ़ेच परिवर्तन नहीं जोड़ता है। उपयोगकर्ता द्वारा प्रदान किए गए इनपुट फ़ंक्शन द्वारा लौटाया गया tf.data.Dataset
।
वितरित इटरेटर्स
गैर-वितरित tf.data.Dataset
उदाहरणों के समान, आपको tf.distribute.DistributedDataset
उदाहरणों पर पुनरावृति करने और tf.distribute.DistributedDataset
में तत्वों तक पहुंचने के लिए एक पुनरावर्तक बनाने की आवश्यकता होगी। निम्नलिखित तरीके हैं जिनसे आप एक tf.distribute.DistributedIterator
बना सकते हैं और अपने मॉडल को प्रशिक्षित करने के लिए इसका उपयोग कर सकते हैं:
उपयोगों
लूप निर्माण के लिए पाइथोनिक का उपयोग करें
आप tf.distribute.DistributedDataset पर पुनरावृति करने के लिए एक उपयोगकर्ता के अनुकूल tf.distribute.DistributedDataset
लूप का उपयोग कर सकते हैं। tf.distribute.DistributedIterator
से लौटाए गए तत्व एकल tf.Tensor
या tf.distribute.DistributedValues
हो सकते हैं जिसमें प्रति प्रतिकृति एक मान होता है। लूप को tf.function
के अंदर रखने से प्रदर्शन को बढ़ावा मिलेगा। हालांकि, break
और return
वर्तमान में tf.distribute.DistributedDataset
tf.function
अंदर रखा गया है।
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
एक स्पष्ट इटरेटर बनाने के लिए iter
का उपयोग करें
tf.distribute.DistributedDataset
इंस्टेंस में तत्वों पर पुनरावृति करने के लिए, आप उस पर iter
API का उपयोग करके tf.distribute.DistributedIterator
बना सकते हैं। एक स्पष्ट पुनरावर्तक के साथ, आप निश्चित चरणों के लिए पुनरावृति कर सकते हैं। tf.distribute.DistributedIterator
उदाहरण dist_iterator
से अगला तत्व प्राप्त करने के लिए, आप next(dist_iterator)
, dist_iterator.get_next()
, या dist_iterator.get_next_as_optional()
पर कॉल कर सकते हैं। पूर्व दो अनिवार्य रूप से समान हैं:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
next()
या tf.distribute.DistributedIterator.get_next()
के साथ, यदि tf.distribute.DistributedIterator
अपने अंत तक पहुँच गया है, तो एक OutOfRange त्रुटि फेंक दी जाएगी। क्लाइंट अजगर की तरफ से त्रुटि पकड़ सकता है और अन्य काम जैसे कि चेकपॉइंटिंग और मूल्यांकन करना जारी रख सकता है। हालांकि, यह काम नहीं करेगा यदि आप एक मेजबान प्रशिक्षण लूप का उपयोग कर रहे हैं (यानी, प्रति tf.function
कई चरणों को चलाएं), जो इस तरह दिखता है:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
में tf.range
के अंदर स्टेप बॉडी को लपेटकर कई चरण होते हैं। इस मामले में, बिना किसी निर्भरता के लूप में अलग-अलग पुनरावृत्तियों समानांतर में शुरू हो सकते हैं, इसलिए पिछले पुनरावृत्तियों की गणना समाप्त होने से पहले बाद के पुनरावृत्तियों में एक आउटऑफरेंज त्रुटि ट्रिगर की जा सकती है। एक बार OutOfRange त्रुटि होने के बाद, फ़ंक्शन के सभी ऑप्स तुरंत समाप्त कर दिए जाएंगे। यदि यह कुछ ऐसा मामला है जिससे आप बचना चाहते हैं, तो एक विकल्प जो आउटऑफरेंज त्रुटि नहीं फेंकता है वह है tf.distribute.DistributedIterator.get_next_as_optional()
। get_next_as_optional
एक tf.experimental.Optional
देता है जिसमें tf.distribute.DistributedIterator
समाप्त होने पर अगला तत्व या कोई मान नहीं होता है।
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
element_spec
संपत्ति का उपयोग करना
यदि आप एक वितरित डेटासेट के तत्वों को tf.function
में पास करते हैं और एक tf.TypeSpec
गारंटी चाहते हैं, तो आप tf.function
के input_signature
तर्क को निर्दिष्ट कर सकते हैं। एक वितरित डेटासेट का आउटपुट tf.distribute.DistributedValues
है जो एक डिवाइस या कई डिवाइस के इनपुट का प्रतिनिधित्व कर सकता है। इस वितरित मूल्य के अनुरूप tf.TypeSpec
प्राप्त करने के लिए आप वितरित डेटासेट या वितरित इटरेटर ऑब्जेक्ट की element_spec
संपत्ति का उपयोग कर सकते हैं।
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
आंशिक बैच
आंशिक बैचों का सामना तब होता है जब tf.data.Dataset
उदाहरण जो उपयोगकर्ता बनाते हैं उनमें बैच आकार हो सकते हैं जो प्रतिकृतियों की संख्या से समान रूप से विभाज्य नहीं होते हैं या जब डेटासेट आवृत्ति की कार्डिनैलिटी बैच आकार से विभाज्य नहीं होती है। इसका मतलब यह है कि जब डेटासेट को कई प्रतियों में वितरित किया जाता है, तो कुछ पुनरावृत्तियों पर next
कॉल के परिणामस्वरूप OutOfRangeError होगा। इस उपयोग के मामले को संभालने के लिए, tf.distribute
उन प्रतिकृतियों पर बैच आकार 0 के डमी बैच देता है जिनके पास संसाधित करने के लिए कोई और डेटा नहीं है।
सिंगल वर्कर केस के लिए, यदि डेटा इटरेटर पर next
कॉल द्वारा वापस नहीं किया जाता है, तो 0 बैच आकार के डमी बैच बनाए जाते हैं और डेटासेट में वास्तविक डेटा के साथ उपयोग किए जाते हैं। आंशिक बैचों के मामले में, डेटा के अंतिम वैश्विक बैच में डेटा के डमी बैचों के साथ वास्तविक डेटा होगा। डेटा को संसाधित करने के लिए रुकने की स्थिति अब जाँचती है कि क्या किसी भी प्रतिकृति में डेटा है। यदि किसी भी प्रतिकृति पर कोई डेटा नहीं है, तो आउटऑफरेंज त्रुटि फेंक दी जाती है।
बहु कार्यकर्ता मामले के लिए, प्रत्येक कार्यकर्ता पर डेटा की उपस्थिति का प्रतिनिधित्व करने वाला बूलियन मान क्रॉस प्रतिकृति संचार का उपयोग करके एकत्रित किया जाता है और इसका उपयोग यह पहचानने के लिए किया जाता है कि सभी श्रमिकों ने वितरित डेटासेट को संसाधित करना समाप्त कर दिया है या नहीं। चूंकि इसमें क्रॉस वर्कर संचार शामिल है, इसमें कुछ प्रदर्शन दंड शामिल है।
चेतावनियां
एकाधिक कार्यकर्ता सेटअप के साथ
tf.distribute.Strategy.experimental_distribute_dataset
API का उपयोग करते समय, उपयोगकर्ता एकtf.data.Dataset
पास करते हैं जो फ़ाइलों से पढ़ता है। यदिtf.data.experimental.AutoShardPolicy
कोAUTO
याFILE
पर सेट किया जाता है, तो वास्तविक प्रति चरण बैच आकार उपयोगकर्ता द्वारा परिभाषित वैश्विक बैच आकार से छोटा हो सकता है। यह तब हो सकता है जब फ़ाइल में शेष तत्व वैश्विक बैच आकार से कम हों। उपयोगकर्ता या तो डेटासेट को चलाने के चरणों की संख्या के आधार पर समाप्त कर सकते हैं या इसके आसपास काम करने के लिएtf.data.experimental.AutoShardPolicy
कोDATA
पर सेट कर सकते हैं।स्टेटफुल डेटासेट ट्रांसफॉर्मेशन वर्तमान में
tf.distribute
के साथ समर्थित नहीं हैं और डेटासेट के किसी भी स्टेटफुल ऑप्स को वर्तमान में अनदेखा किया जा सकता है। उदाहरण के लिए, यदि आपके डेटासेट में एकmap_fn
है जो किसी छवि को घुमाने के लिएtf.random.uniform
का उपयोग करता है, तो आपके पास एक डेटासेट ग्राफ़ है जो स्थानीय मशीन पर राज्य (यानी यादृच्छिक बीज) पर निर्भर करता है जहां पायथन प्रक्रिया निष्पादित की जा रही है।प्रायोगिक
tf.data.experimental.OptimizationOptions
जो डिफ़ॉल्ट रूप से अक्षम होते हैं, कुछ संदर्भों में - जैसे किtf.distribute
.वितरण के साथ उपयोग किए जाने पर - प्रदर्शन में गिरावट का कारण बन सकते हैं। आपको उन्हें तभी सक्षम करना चाहिए जब आप यह सत्यापित कर लें कि वे वितरण सेटिंग में आपके कार्यभार के प्रदर्शन को लाभान्वित करते हैं।सामान्य रूप से
tf.data
के साथ अपनी इनपुट पाइपलाइन को कैसे अनुकूलित करें, इसके लिए कृपया इस गाइड को देखें। कुछ अतिरिक्त टिप्स:यदि आपके पास एक से अधिक कर्मचारी हैं और एक या अधिक ग्लोब पैटर्न से मेल खाने वाली सभी फ़ाइलों से डेटासेट बनाने के लिए
tf.data.Dataset.list_files
का उपयोग कर रहे हैं, तो याद रखें किseed
तर्क सेट करें याshuffle=False
सेट करें ताकि प्रत्येक कार्यकर्ता फ़ाइल को लगातार शार्प करे।यदि आपकी इनपुट पाइपलाइन में रिकॉर्ड स्तर पर डेटा को शफ़ल करना और डेटा को पार्स करना दोनों शामिल हैं, जब तक कि अनपेक्षित डेटा पार्स किए गए डेटा (जो आमतौर पर ऐसा नहीं है) से काफी बड़ा है, पहले फेरबदल करें और फिर पार्स करें, जैसा कि निम्नलिखित उदाहरण में दिखाया गया है। यह स्मृति उपयोग और प्रदर्शन को लाभ पहुंचा सकता है।
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
buffer_size
तत्वों के आंतरिक बफर को बनाए रखता है, और इस प्रकारbuffer_size
को कम करने से OOM समस्या कम हो सकती है।tf.distribute.experimental_distribute_dataset
याtf.distribute.distribute_datasets_from_function
का उपयोग करते समय श्रमिकों द्वारा डेटा को संसाधित करने के क्रम की गारंटी नहीं है। यह आमतौर पर तब आवश्यक होता है जब आपtf.distribute
से स्केल भविष्यवाणी का उपयोग कर रहे हों। हालांकि आप बैच में प्रत्येक तत्व के लिए एक इंडेक्स सम्मिलित कर सकते हैं और तदनुसार आउटपुट ऑर्डर कर सकते हैं। निम्न स्निपेट आउटपुट ऑर्डर करने का एक उदाहरण है।
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
यदि मैं विहित tf.data.Dataset उदाहरण का उपयोग नहीं कर रहा हूँ, तो मैं अपना डेटा कैसे वितरित करूँ?
कभी-कभी उपयोगकर्ता अपने इनपुट का प्रतिनिधित्व करने के लिए tf.data.Dataset
का उपयोग नहीं कर सकते हैं और बाद में उपर्युक्त एपीआई का उपयोग डेटासेट को कई उपकरणों में वितरित करने के लिए कर सकते हैं। ऐसे मामलों में आप जनरेटर से कच्चे टेंसर या इनपुट का उपयोग कर सकते हैं।
मनमाना टेंसर इनपुट के लिए प्रयोग_वितरण_वैल्यू_फ्रॉम_फंक्शन का उपयोग करें
strategy.run
tf.distribute.DistributedValues
स्वीकार करता है जो next(iterator)
का आउटपुट है। टेंसर मानों को पास करने के लिए, tf.वितरण का निर्माण करने के लिए experimental_distribute_values_from_function
tf.distribute.DistributedValues
का उपयोग करें। कच्चे टेंसर से वितरित मान।
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
यदि आपका इनपुट जनरेटर से है तो tf.data.Dataset.from_generator का उपयोग करें
यदि आपके पास एक जनरेटर फ़ंक्शन है जिसका आप उपयोग करना चाहते हैं, तो आप from_generator
API का उपयोग करके एक tf.data.Dataset
उदाहरण बना सकते हैं।
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.