वितरित इनपुट

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

tf.distribute API उपयोगकर्ताओं को एक मशीन से कई मशीनों तक अपने प्रशिक्षण को स्केल करने का एक आसान तरीका प्रदान करता है। अपने मॉडल को स्केल करते समय, उपयोगकर्ताओं को अपने इनपुट को कई उपकरणों में वितरित करना होता है। tf.distribute एपीआई प्रदान करता है जिसके उपयोग से आप अपने इनपुट को स्वचालित रूप से सभी उपकरणों में वितरित कर सकते हैं।

यह मार्गदर्शिका आपको tf.distribute APIs का उपयोग करके वितरित डेटासेट और इटरेटर बनाने के विभिन्न तरीकों को दिखाएगी। इसके अतिरिक्त, निम्नलिखित विषयों को कवर किया जाएगा:

यह मार्गदर्शिका केरस एपीआई के साथ वितरित इनपुट के उपयोग को कवर नहीं करती है।

वितरित डेटासेट

बड़े पैमाने पर tf.वितरित API का उपयोग करने के लिए, यह अनुशंसा की जाती है कि उपयोगकर्ता अपने इनपुट का प्रतिनिधित्व करने के लिए tf.distribute का उपयोग tf.data.Datasettf.distribute के साथ कुशलतापूर्वक कार्य करने के लिए tf.data.Dataset .वितरण किया गया है। यदि आपके पास tf.data.Dataset के अलावा किसी अन्य चीज़ का उपयोग करने के लिए उपयोग का मामला है, तो कृपया इस गाइड में बाद का अनुभाग देखें। एक गैर-वितरित प्रशिक्षण लूप में, उपयोगकर्ता पहले एक tf.data.Dataset उदाहरण बनाते हैं और फिर तत्वों पर पुनरावृति करते हैं। उदाहरण के लिए:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

उपयोगकर्ताओं को उपयोगकर्ता के मौजूदा कोड में न्यूनतम परिवर्तनों के साथ tf.distribute रणनीति का उपयोग करने की अनुमति देने के लिए, दो एपीआई पेश किए गए थे जो एक tf.data.Dataset उदाहरण वितरित करेंगे और एक वितरित डेटासेट ऑब्जेक्ट लौटाएंगे। एक उपयोगकर्ता तब इस वितरित डेटासेट उदाहरण पर पुनरावृति कर सकता है और अपने मॉडल को पहले की तरह प्रशिक्षित कर सकता है। आइए अब हम दो एपीआई - tf.distribute.Strategy.experimental_distribute_dataset और tf.distribute.Strategy.distribute_datasets_from_function को अधिक विस्तार से देखें:

tf.distribute.Strategy.experimental_distribute_dataset

प्रयोग

यह एपीआई इनपुट के रूप में एक tf.data.Dataset उदाहरण लेता है और एक tf.distribute.DistributedDataset उदाहरण देता है। आपको इनपुट डेटासेट को वैश्विक बैच आकार के बराबर मान के साथ बैच करना चाहिए। यह वैश्विक बैच आकार उन नमूनों की संख्या है, जिन्हें आप 1 चरण में सभी उपकरणों पर संसाधित करना चाहते हैं। आप इस वितरित डेटासेट पर पाइथोनिक फैशन में पुनरावृति कर सकते हैं या iter का उपयोग करके एक पुनरावर्तक बना सकते हैं। लौटाई गई वस्तु एक tf.data.Dataset उदाहरण नहीं है और किसी भी अन्य API का समर्थन नहीं करती है जो किसी भी तरह से डेटासेट को रूपांतरित या निरीक्षण करता है। यह अनुशंसित एपीआई है यदि आपके पास विशिष्ट तरीके नहीं हैं जिसमें आप विभिन्न प्रतिकृतियों पर अपने इनपुट को शार्प करना चाहते हैं।

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

गुण

बैचिंग

tf.वितरित इनपुट tf.distribute उदाहरण को एक नए बैच आकार के साथ tf.data.Dataset करता है जो सिंक में प्रतिकृतियों की संख्या से विभाजित वैश्विक बैच आकार के बराबर है। सिंक में प्रतिकृतियों की संख्या उन उपकरणों की संख्या के बराबर है जो प्रशिक्षण के दौरान ग्रेडिएंट सभी में भाग ले रहे हैं। जब कोई उपयोगकर्ता वितरित इटरेटर पर next कॉल करता है, तो प्रत्येक प्रतिकृति पर डेटा का प्रति प्रतिकृति बैच आकार वापस कर दिया जाता है। रीबैच्ड डेटासेट कार्डिनैलिटी हमेशा प्रतिकृतियों की संख्या का गुणक होगी। यहां कुछ उदाहरण दिए गए हैं:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • वितरण के बिना:
    • बैच 1: [0, 1, 2, 3]
    • बैच 2: [4, 5]
    • 2 प्रतिकृतियों पर वितरण के साथ। अंतिम बैच ([4, 5]) 2 प्रतिकृतियों के बीच विभाजित है।

    • बैच 1:

      • प्रतिकृति 1:[0, 1]
      • प्रतिकृति 2:[2, 3]
    • बैच 2:

      • प्रतिकृति 2: [4]
      • प्रतिकृति 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • वितरण के बिना:
    • बैच 1: [[0], [1], [2], [3]]
    • 5 से अधिक प्रतिकृतियों के वितरण के साथ:
    • बैच 1:
      • प्रतिकृति 1: [0]
      • प्रतिकृति 2: [1]
      • प्रतिकृति 3: [2]
      • प्रतिकृति 4: [3]
      • प्रतिकृति 5: []
  • tf.data.Dataset.range(8).batch(4)

    • वितरण के बिना:
    • बैच 1: [0, 1, 2, 3]
    • बैच 2: [4, 5, 6, 7]
    • 3 प्रतिकृतियों पर वितरण के साथ:
    • बैच 1:
      • प्रतिकृति 1: [0, 1]
      • प्रतिकृति 2: [2, 3]
      • प्रतिकृति 3: []
    • बैच 2:
      • प्रतिकृति 1: [4, 5]
      • प्रतिकृति 2: [6, 7]
      • प्रतिकृति 3: []

डेटासेट को रीबैच करने में एक स्थान जटिलता होती है जो प्रतिकृतियों की संख्या के साथ रैखिक रूप से बढ़ती है। इसका मतलब है कि बहु-कार्यकर्ता प्रशिक्षण उपयोग के मामले में इनपुट पाइपलाइन OOM त्रुटियों में चल सकती है।

शेयरिंग

tf.distribute MultiWorkerMirroredStrategy और TPUStrategy के साथ मल्टी वर्कर ट्रेनिंग में इनपुट डेटासेट को ऑटोशर्ड भी करता है। प्रत्येक डेटासेट कार्यकर्ता के सीपीयू डिवाइस पर बनाया जाता है। श्रमिकों के एक समूह पर एक डेटासेट को ऑटोशर्डिंग करने का अर्थ है कि प्रत्येक कार्यकर्ता को संपूर्ण डेटासेट का एक सबसेट सौंपा गया है (यदि सही tf.data.experimental.AutoShardPolicy सेट है)। यह सुनिश्चित करने के लिए है कि प्रत्येक चरण में, गैर-अतिव्यापी डेटासेट तत्वों का एक वैश्विक बैच आकार प्रत्येक कार्यकर्ता द्वारा संसाधित किया जाएगा। ऑटोशर्डिंग में कुछ अलग विकल्प होते हैं जिन्हें tf.data.experimental.DistributeOptions का उपयोग करके निर्दिष्ट किया जा सकता है। ध्यान दें कि ParameterServerStrategy के साथ बहु-कार्यकर्ता प्रशिक्षण में कोई ऑटोशेयरिंग नहीं है, और इस रणनीति के साथ डेटासेट निर्माण पर अधिक जानकारी पैरामीटर सर्वर रणनीति ट्यूटोरियल में पाई जा सकती है।

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

आप tf.data.experimental.AutoShardPolicy के लिए तीन अलग-अलग विकल्प सेट कर सकते हैं:

  • AUTO: यह डिफ़ॉल्ट विकल्प है जिसका अर्थ है कि FILE द्वारा शार्प करने का प्रयास किया जाएगा। फ़ाइल-आधारित डेटासेट का पता नहीं चलने पर FILE द्वारा शार्प करने का प्रयास विफल हो जाता है। tf.distribute फिर DATA द्वारा शार्डिंग पर वापस आ जाएगा। ध्यान दें कि यदि इनपुट डेटासेट फ़ाइल-आधारित है, लेकिन फ़ाइलों की संख्या श्रमिकों की संख्या से कम है, तो एक InvalidArgumentError उठाया जाएगा। यदि ऐसा होता है, तो नीति को स्पष्ट रूप से AutoShardPolicy.DATA पर सेट करें, या अपने इनपुट स्रोत को छोटी फ़ाइलों में विभाजित करें ताकि फाइलों की संख्या श्रमिकों की संख्या से अधिक हो।
  • FILE: यह विकल्प है यदि आप सभी श्रमिकों पर इनपुट फ़ाइलों को शार्प करना चाहते हैं। आपको इस विकल्प का उपयोग करना चाहिए यदि इनपुट फाइलों की संख्या श्रमिकों की संख्या से बहुत अधिक है और फाइलों में डेटा समान रूप से वितरित किया गया है। यदि फाइलों में डेटा समान रूप से वितरित नहीं किया जाता है, तो इस विकल्प का नकारात्मक पक्ष निष्क्रिय श्रमिकों का होना है। यदि फाइलों की संख्या श्रमिकों की संख्या से कम है, तो एक InvalidArgumentError हो जाएगी। यदि ऐसा होता है, तो नीति को स्पष्ट रूप से AutoShardPolicy.DATA पर सेट करें। उदाहरण के लिए, आइए हम 2 फाइलों को 2 कर्मचारियों पर वितरित करें जिनमें से प्रत्येक में 1 प्रतिकृति है। फ़ाइल 1 में [0, 1, 2, 3, 4, 5] और फ़ाइल 2 में [6, 7, 8, 9, 10, 11] हैं। बता दें कि सिंक में प्रतिकृतियों की कुल संख्या 2 है और वैश्विक बैच का आकार 4 है।

    • कार्यकर्ता 0:
    • बैच 1 = प्रतिकृति 1: [0, 1]
    • बैच 2 = प्रतिकृति 1: [2, 3]
    • बैच 3 = प्रतिकृति 1: [4]
    • बैच 4 = प्रतिकृति 1: [5]
    • कार्यकर्ता 1:
    • बैच 1 = प्रतिकृति 2: [6, 7]
    • बैच 2 = प्रतिकृति 2: [8, 9]
    • बैच 3 = प्रतिकृति 2: [10]
    • बैच 4 = प्रतिकृति 2: [11]
  • डेटा: यह सभी कर्मचारियों के तत्वों को ऑटोशर्ड करेगा। प्रत्येक कार्यकर्ता पूरे डेटासेट को पढ़ेगा और केवल उसे सौंपे गए शार्प को प्रोसेस करेगा। अन्य सभी शार्क को त्याग दिया जाएगा। यह आमतौर पर तब उपयोग किया जाता है जब इनपुट फाइलों की संख्या श्रमिकों की संख्या से कम होती है और आप सभी श्रमिकों के डेटा को बेहतर तरीके से साझा करना चाहते हैं। नकारात्मक पक्ष यह है कि प्रत्येक कार्यकर्ता पर संपूर्ण डेटासेट पढ़ा जाएगा। उदाहरण के लिए, आइए हम 2 कर्मचारियों पर 1 फाइल वितरित करें। फ़ाइल 1 में [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] शामिल हैं। बता दें कि सिंक में प्रतिकृतियों की कुल संख्या 2 है।

    • कार्यकर्ता 0:
    • बैच 1 = प्रतिकृति 1: [0, 1]
    • बैच 2 = प्रतिकृति 1: [4, 5]
    • बैच 3 = प्रतिकृति 1: [8, 9]
    • कार्यकर्ता 1:
    • बैच 1 = प्रतिकृति 2: [2, 3]
    • बैच 2 = प्रतिकृति 2: [6, 7]
    • बैच 3 = प्रतिकृति 2: [10, 11]
  • बंद: यदि आप ऑटोशेयरिंग को बंद करते हैं, तो प्रत्येक कार्यकर्ता सभी डेटा को संसाधित करेगा। उदाहरण के लिए, आइए हम 2 कर्मचारियों पर 1 फाइल वितरित करें। फ़ाइल 1 में [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] शामिल हैं। सिंक में प्रतिकृतियों की कुल संख्या 2 होने दें। फिर प्रत्येक कार्यकर्ता को निम्नलिखित वितरण दिखाई देगा:

    • कार्यकर्ता 0:
    • बैच 1 = प्रतिकृति 1: [0, 1]
    • बैच 2 = प्रतिकृति 1: [2, 3]
    • बैच 3 = प्रतिकृति 1: [4, 5]
    • बैच 4 = प्रतिकृति 1: [6, 7]
    • बैच 5 = प्रतिकृति 1: [8, 9]
    • बैच 6 = प्रतिकृति 1: [10, 11]

    • कार्यकर्ता 1:

    • बैच 1 = प्रतिकृति 2: [0, 1]

    • बैच 2 = प्रतिकृति 2: [2, 3]

    • बैच 3 = प्रतिकृति 2: [4, 5]

    • बैच 4 = प्रतिकृति 2: [6, 7]

    • बैच 5 = प्रतिकृति 2: [8, 9]

    • बैच 6 = प्रतिकृति 2: [10, 11]

प्रीफेचिंग

डिफ़ॉल्ट रूप से, tf.distribute उपयोगकर्ता द्वारा प्रदान किए गए tf.data.Dataset उदाहरण के अंत में एक प्रीफ़ेच परिवर्तन जोड़ता है। प्रीफ़ेच ट्रांसफ़ॉर्मेशन का तर्क जो कि buffer_size है, सिंक में प्रतिकृतियों की संख्या के बराबर है।

tf.distribute.Strategy.distribute_datasets_from_function

प्रयोग

यह एपीआई एक इनपुट फ़ंक्शन लेता है और एक tf.distribute.DistributedDataset उदाहरण देता है। उपयोगकर्ता द्वारा पास किए जाने वाले इनपुट फ़ंक्शन में tf.distribute.InputContext तर्क होता है और उसे tf.data.Dataset उदाहरण वापस करना चाहिए। इस एपीआई के साथ, tf.distribute उपयोगकर्ता के tf.data.Dataset में कोई और बदलाव नहीं करता है। इनपुट फ़ंक्शन से लौटाए गए डेटासेट इंस्टेंस। डेटासेट को बैच और शार्प करना उपयोगकर्ता की जिम्मेदारी है। tf.distribute प्रत्येक कार्यकर्ता के CPU डिवाइस पर इनपुट फ़ंक्शन को कॉल करता है। उपयोगकर्ताओं को अपने स्वयं के बैचिंग और शार्डिंग लॉजिक को निर्दिष्ट करने की अनुमति देने के अलावा, यह एपीआई बहु-कार्यकर्ता प्रशिक्षण के लिए उपयोग किए जाने पर tf.distribute.Strategy.experimental_distribute_dataset की तुलना में बेहतर मापनीयता और प्रदर्शन भी प्रदर्शित करता है।

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

गुण

बैचिंग

tf.data.Dataset उदाहरण जो कि इनपुट फ़ंक्शन का रिटर्न मान है, प्रति प्रतिकृति बैच आकार का उपयोग करके बैच किया जाना चाहिए। प्रति प्रतिकृति बैच आकार वैश्विक बैच आकार है जो सिंक प्रशिक्षण में भाग लेने वाले प्रतिकृतियों की संख्या से विभाजित होता है। ऐसा इसलिए है क्योंकि tf.distribute प्रत्येक कार्यकर्ता के CPU डिवाइस पर इनपुट फ़ंक्शन को कॉल करता है। किसी दिए गए कार्यकर्ता पर बनाया गया डेटासेट उस कार्यकर्ता पर सभी प्रतिकृतियों द्वारा उपयोग के लिए तैयार होना चाहिए।

शेयरिंग

tf.distribute.InputContext ऑब्जेक्ट जो परोक्ष रूप से उपयोगकर्ता के इनपुट फ़ंक्शन के लिए एक तर्क के रूप में पारित किया जाता है, हुड के तहत tf.distribute .वितरण द्वारा बनाया जाता है। इसमें श्रमिकों की संख्या, वर्तमान कार्यकर्ता आईडी आदि के बारे में जानकारी है। यह इनपुट फ़ंक्शन इन गुणों का उपयोग करके उपयोगकर्ता द्वारा निर्धारित नीतियों के अनुसार शार्डिंग को संभाल सकता है जो tf.distribute.InputContext ऑब्जेक्ट का हिस्सा हैं।

प्रीफेचिंग

tf. tf.distribute के अंत में एक प्रीफ़ेच परिवर्तन नहीं जोड़ता है। उपयोगकर्ता द्वारा प्रदान किए गए इनपुट फ़ंक्शन द्वारा लौटाया गया tf.data.Dataset

वितरित इटरेटर्स

गैर-वितरित tf.data.Dataset उदाहरणों के समान, आपको tf.distribute.DistributedDataset उदाहरणों पर पुनरावृति करने और tf.distribute.DistributedDataset में तत्वों तक पहुंचने के लिए एक पुनरावर्तक बनाने की आवश्यकता होगी। निम्नलिखित तरीके हैं जिनसे आप एक tf.distribute.DistributedIterator बना सकते हैं और अपने मॉडल को प्रशिक्षित करने के लिए इसका उपयोग कर सकते हैं:

उपयोगों

लूप निर्माण के लिए पाइथोनिक का उपयोग करें

आप tf.distribute.DistributedDataset पर पुनरावृति करने के लिए एक उपयोगकर्ता के अनुकूल tf.distribute.DistributedDataset लूप का उपयोग कर सकते हैं। tf.distribute.DistributedIterator से लौटाए गए तत्व एकल tf.Tensor या tf.distribute.DistributedValues ​​हो सकते हैं जिसमें प्रति प्रतिकृति एक मान होता है। लूप को tf.function के अंदर रखने से प्रदर्शन को बढ़ावा मिलेगा। हालांकि, break और return वर्तमान में tf.distribute.DistributedDataset tf.function अंदर रखा गया है।

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

एक स्पष्ट इटरेटर बनाने के लिए iter का उपयोग करें

tf.distribute.DistributedDataset इंस्टेंस में तत्वों पर पुनरावृति करने के लिए, आप उस पर iter API का उपयोग करके tf.distribute.DistributedIterator बना सकते हैं। एक स्पष्ट पुनरावर्तक के साथ, आप निश्चित चरणों के लिए पुनरावृति कर सकते हैं। tf.distribute.DistributedIterator उदाहरण dist_iterator से अगला तत्व प्राप्त करने के लिए, आप next(dist_iterator) , dist_iterator.get_next() , या dist_iterator.get_next_as_optional() पर कॉल कर सकते हैं। पूर्व दो अनिवार्य रूप से समान हैं:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

next() या tf.distribute.DistributedIterator.get_next() के साथ, यदि tf.distribute.DistributedIterator अपने अंत तक पहुँच गया है, तो एक OutOfRange त्रुटि फेंक दी जाएगी। क्लाइंट अजगर की तरफ से त्रुटि पकड़ सकता है और अन्य काम जैसे कि चेकपॉइंटिंग और मूल्यांकन करना जारी रख सकता है। हालांकि, यह काम नहीं करेगा यदि आप एक मेजबान प्रशिक्षण लूप का उपयोग कर रहे हैं (यानी, प्रति tf.function कई चरणों को चलाएं), जो इस तरह दिखता है:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn में tf.range के अंदर स्टेप बॉडी को लपेटकर कई चरण होते हैं। इस मामले में, बिना किसी निर्भरता के लूप में अलग-अलग पुनरावृत्तियों समानांतर में शुरू हो सकते हैं, इसलिए पिछले पुनरावृत्तियों की गणना समाप्त होने से पहले बाद के पुनरावृत्तियों में एक आउटऑफरेंज त्रुटि ट्रिगर की जा सकती है। एक बार OutOfRange त्रुटि होने के बाद, फ़ंक्शन के सभी ऑप्स तुरंत समाप्त कर दिए जाएंगे। यदि यह कुछ ऐसा मामला है जिससे आप बचना चाहते हैं, तो एक विकल्प जो आउटऑफरेंज त्रुटि नहीं फेंकता है वह है tf.distribute.DistributedIterator.get_next_as_optional()get_next_as_optional एक tf.experimental.Optional देता है जिसमें tf.distribute.DistributedIterator समाप्त होने पर अगला तत्व या कोई मान नहीं होता है।

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

element_spec संपत्ति का उपयोग करना

यदि आप एक वितरित डेटासेट के तत्वों को tf.function में पास करते हैं और एक tf.TypeSpec गारंटी चाहते हैं, तो आप tf.function के input_signature तर्क को निर्दिष्ट कर सकते हैं। एक वितरित डेटासेट का आउटपुट tf.distribute.DistributedValues है जो एक डिवाइस या कई डिवाइस के इनपुट का प्रतिनिधित्व कर सकता है। इस वितरित मूल्य के अनुरूप tf.TypeSpec प्राप्त करने के लिए आप वितरित डेटासेट या वितरित इटरेटर ऑब्जेक्ट की element_spec संपत्ति का उपयोग कर सकते हैं।

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

आंशिक बैच

आंशिक बैचों का सामना तब होता है जब tf.data.Dataset उदाहरण जो उपयोगकर्ता बनाते हैं उनमें बैच आकार हो सकते हैं जो प्रतिकृतियों की संख्या से समान रूप से विभाज्य नहीं होते हैं या जब डेटासेट आवृत्ति की कार्डिनैलिटी बैच आकार से विभाज्य नहीं होती है। इसका मतलब यह है कि जब डेटासेट को कई प्रतियों में वितरित किया जाता है, तो कुछ पुनरावृत्तियों पर next कॉल के परिणामस्वरूप OutOfRangeError होगा। इस उपयोग के मामले को संभालने के लिए, tf.distribute उन प्रतिकृतियों पर बैच आकार 0 के डमी बैच देता है जिनके पास संसाधित करने के लिए कोई और डेटा नहीं है।

सिंगल वर्कर केस के लिए, यदि डेटा इटरेटर पर next कॉल द्वारा वापस नहीं किया जाता है, तो 0 बैच आकार के डमी बैच बनाए जाते हैं और डेटासेट में वास्तविक डेटा के साथ उपयोग किए जाते हैं। आंशिक बैचों के मामले में, डेटा के अंतिम वैश्विक बैच में डेटा के डमी बैचों के साथ वास्तविक डेटा होगा। डेटा को संसाधित करने के लिए रुकने की स्थिति अब जाँचती है कि क्या किसी भी प्रतिकृति में डेटा है। यदि किसी भी प्रतिकृति पर कोई डेटा नहीं है, तो आउटऑफरेंज त्रुटि फेंक दी जाती है।

बहु कार्यकर्ता मामले के लिए, प्रत्येक कार्यकर्ता पर डेटा की उपस्थिति का प्रतिनिधित्व करने वाला बूलियन मान क्रॉस प्रतिकृति संचार का उपयोग करके एकत्रित किया जाता है और इसका उपयोग यह पहचानने के लिए किया जाता है कि सभी श्रमिकों ने वितरित डेटासेट को संसाधित करना समाप्त कर दिया है या नहीं। चूंकि इसमें क्रॉस वर्कर संचार शामिल है, इसमें कुछ प्रदर्शन दंड शामिल है।

चेतावनियां

  • एकाधिक कार्यकर्ता सेटअप के साथ tf.distribute.Strategy.experimental_distribute_dataset API का उपयोग करते समय, उपयोगकर्ता एक tf.data.Dataset पास करते हैं जो फ़ाइलों से पढ़ता है। यदि tf.data.experimental.AutoShardPolicy को AUTO या FILE पर सेट किया जाता है, तो वास्तविक प्रति चरण बैच आकार उपयोगकर्ता द्वारा परिभाषित वैश्विक बैच आकार से छोटा हो सकता है। यह तब हो सकता है जब फ़ाइल में शेष तत्व वैश्विक बैच आकार से कम हों। उपयोगकर्ता या तो डेटासेट को चलाने के चरणों की संख्या के आधार पर समाप्त कर सकते हैं या इसके आसपास काम करने के लिए tf.data.experimental.AutoShardPolicy को DATA पर सेट कर सकते हैं।

  • स्टेटफुल डेटासेट ट्रांसफॉर्मेशन वर्तमान में tf.distribute के साथ समर्थित नहीं हैं और डेटासेट के किसी भी स्टेटफुल ऑप्स को वर्तमान में अनदेखा किया जा सकता है। उदाहरण के लिए, यदि आपके डेटासेट में एक map_fn है जो किसी छवि को घुमाने के लिए tf.random.uniform का उपयोग करता है, तो आपके पास एक डेटासेट ग्राफ़ है जो स्थानीय मशीन पर राज्य (यानी यादृच्छिक बीज) पर निर्भर करता है जहां पायथन प्रक्रिया निष्पादित की जा रही है।

  • प्रायोगिक tf.data.experimental.OptimizationOptions जो डिफ़ॉल्ट रूप से अक्षम होते हैं, कुछ संदर्भों में - जैसे कि tf.distribute .वितरण के साथ उपयोग किए जाने पर - प्रदर्शन में गिरावट का कारण बन सकते हैं। आपको उन्हें तभी सक्षम करना चाहिए जब आप यह सत्यापित कर लें कि वे वितरण सेटिंग में आपके कार्यभार के प्रदर्शन को लाभान्वित करते हैं।

  • सामान्य रूप से tf.data के साथ अपनी इनपुट पाइपलाइन को कैसे अनुकूलित करें, इसके लिए कृपया इस गाइड को देखें। कुछ अतिरिक्त टिप्स:

    • यदि आपके पास एक से अधिक कर्मचारी हैं और एक या अधिक ग्लोब पैटर्न से मेल खाने वाली सभी फ़ाइलों से डेटासेट बनाने के लिए tf.data.Dataset.list_files का उपयोग कर रहे हैं, तो याद रखें कि seed तर्क सेट करें या shuffle=False सेट करें ताकि प्रत्येक कार्यकर्ता फ़ाइल को लगातार शार्प करे।

    • यदि आपकी इनपुट पाइपलाइन में रिकॉर्ड स्तर पर डेटा को शफ़ल करना और डेटा को पार्स करना दोनों शामिल हैं, जब तक कि अनपेक्षित डेटा पार्स किए गए डेटा (जो आमतौर पर ऐसा नहीं है) से काफी बड़ा है, पहले फेरबदल करें और फिर पार्स करें, जैसा कि निम्नलिखित उदाहरण में दिखाया गया है। यह स्मृति उपयोग और प्रदर्शन को लाभ पहुंचा सकता है।

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) buffer_size तत्वों के आंतरिक बफर को बनाए रखता है, और इस प्रकार buffer_size को कम करने से OOM समस्या कम हो सकती है।

  • tf.distribute.experimental_distribute_dataset या tf.distribute.distribute_datasets_from_function का उपयोग करते समय श्रमिकों द्वारा डेटा को संसाधित करने के क्रम की गारंटी नहीं है। यह आमतौर पर तब आवश्यक होता है जब आप tf.distribute से स्केल भविष्यवाणी का उपयोग कर रहे हों। हालांकि आप बैच में प्रत्येक तत्व के लिए एक इंडेक्स सम्मिलित कर सकते हैं और तदनुसार आउटपुट ऑर्डर कर सकते हैं। निम्न स्निपेट आउटपुट ऑर्डर करने का एक उदाहरण है।

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

यदि मैं विहित tf.data.Dataset उदाहरण का उपयोग नहीं कर रहा हूँ, तो मैं अपना डेटा कैसे वितरित करूँ?

कभी-कभी उपयोगकर्ता अपने इनपुट का प्रतिनिधित्व करने के लिए tf.data.Dataset का उपयोग नहीं कर सकते हैं और बाद में उपर्युक्त एपीआई का उपयोग डेटासेट को कई उपकरणों में वितरित करने के लिए कर सकते हैं। ऐसे मामलों में आप जनरेटर से कच्चे टेंसर या इनपुट का उपयोग कर सकते हैं।

मनमाना टेंसर इनपुट के लिए प्रयोग_वितरण_वैल्यू_फ्रॉम_फंक्शन का उपयोग करें

strategy.run tf.distribute.DistributedValues ​​स्वीकार करता है जो next(iterator) का आउटपुट है। टेंसर मानों को पास करने के लिए, tf.वितरण का निर्माण करने के लिए experimental_distribute_values_from_function tf.distribute.DistributedValues का उपयोग करें। कच्चे टेंसर से वितरित मान।

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

यदि आपका इनपुट जनरेटर से है तो tf.data.Dataset.from_generator का उपयोग करें

यदि आपके पास एक जनरेटर फ़ंक्शन है जिसका आप उपयोग करना चाहते हैं, तो आप from_generator API का उपयोग करके एक tf.data.Dataset उदाहरण बना सकते हैं।

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.