केरासो के साथ बहु-कार्यकर्ता प्रशिक्षण

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

यह ट्यूटोरियल दर्शाता है कि tf.distribute.Strategy API—विशेष रूप से tf.distribute.MultiWorkerMirroredStrategy क्लास का उपयोग करके tf.distribute.Strategy मॉडल और Model.fit API के साथ बहु-कार्यकर्ता वितरित प्रशिक्षण कैसे करें। इस रणनीति की मदद से, एक केरस मॉडल जिसे एकल-कार्यकर्ता पर चलाने के लिए डिज़ाइन किया गया था, न्यूनतम कोड परिवर्तन के साथ कई श्रमिकों पर मूल रूप से काम कर सकता है।

tf.distribute.Strategy API की गहरी समझ में रुचि रखने वालों के लिए, TensorFlow गाइड में वितरित प्रशिक्षण TensorFlow द्वारा समर्थित वितरण रणनीतियों के अवलोकन के लिए उपलब्ध है।

केरस और एक कस्टम प्रशिक्षण लूप के साथ MultiWorkerMirroredStrategy का उपयोग करने का तरीका जानने के लिए, Keras और MultiWorkerMirroredStrategy के साथ कस्टम प्रशिक्षण लूप देखें।

ध्यान दें कि इस ट्यूटोरियल का उद्देश्य दो श्रमिकों के साथ न्यूनतम बहु-कार्यकर्ता उदाहरण प्रदर्शित करना है।

सेट अप

कुछ आवश्यक आयातों के साथ प्रारंभ करें:

import json
import os
import sys

TensorFlow आयात करने से पहले, पर्यावरण में कुछ बदलाव करें:

  1. सभी GPU अक्षम करें। यह सभी एक ही GPU का उपयोग करने की कोशिश कर रहे श्रमिकों द्वारा होने वाली त्रुटियों को रोकता है। एक वास्तविक दुनिया के अनुप्रयोग में, प्रत्येक कार्यकर्ता एक अलग मशीन पर होगा।
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. TF_CONFIG पर्यावरण चर को रीसेट करें (आप इसके बारे में बाद में और जानेंगे):
os.environ.pop('TF_CONFIG', None)
  1. सुनिश्चित करें कि वर्तमान निर्देशिका पायथन के पथ पर है—यह नोटबुक को %%writefile द्वारा लिखी गई फ़ाइलों को बाद में आयात करने की अनुमति देता है:
if '.' not in sys.path:
  sys.path.insert(0, '.')

अब TensorFlow आयात करें:

import tensorflow as tf

डेटासेट और मॉडल परिभाषा

इसके बाद, एक साधारण मॉडल और डेटासेट सेटअप के साथ एक mnist_setup.py फ़ाइल बनाएँ। इस ट्यूटोरियल में कार्यकर्ता प्रक्रियाओं द्वारा इस पायथन फ़ाइल का उपयोग किया जाएगा:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

एकल कार्यकर्ता पर मॉडल प्रशिक्षण

कम संख्या में युगों के लिए मॉडल को प्रशिक्षित करने का प्रयास करें और यह सुनिश्चित करने के लिए कि सब कुछ सही ढंग से काम करता है, एकल कार्यकर्ता के परिणामों का निरीक्षण करें। जैसे-जैसे प्रशिक्षण आगे बढ़ता है, नुकसान कम होना चाहिए और सटीकता बढ़नी चाहिए।

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

बहु-कार्यकर्ता विन्यास

आइए अब बहु-कार्यकर्ता प्रशिक्षण की दुनिया में प्रवेश करें।

नौकरियों और कार्यों के साथ एक समूह

TensorFlow में, वितरित प्रशिक्षण में शामिल हैं: कई नौकरियों के साथ एक 'cluster' , और प्रत्येक कार्य में एक या अधिक 'task' हो सकते हैं।

आपको कई मशीनों पर प्रशिक्षण के लिए TF_CONFIG कॉन्फ़िगरेशन पर्यावरण चर की आवश्यकता होगी, जिनमें से प्रत्येक की एक अलग भूमिका हो सकती है। TF_CONFIG एक JSON स्ट्रिंग है जिसका उपयोग क्लस्टर के प्रत्येक कार्यकर्ता के लिए क्लस्टर कॉन्फ़िगरेशन को निर्दिष्ट करने के लिए किया जाता है।

TF_CONFIG चर के दो घटक हैं: 'cluster' और 'task'

  • एक 'cluster' सभी श्रमिकों के लिए समान होता है और प्रशिक्षण क्लस्टर के बारे में जानकारी प्रदान करता है, जो विभिन्न प्रकार की नौकरियों से युक्त एक निर्देश है, जैसे 'worker' या 'chief'

    • tf.distribute.MultiWorkerMirroredStrategy के साथ बहु-कार्यकर्ता प्रशिक्षण में, आमतौर पर एक 'worker' होता है जो एक नियमित 'worker' अलावा, एक चेकपॉइंट को सहेजना और TensorBoard के लिए एक सारांश फ़ाइल लिखने जैसी ज़िम्मेदारियाँ लेता है। ऐसे 'worker' को मुख्य कार्यकर्ता (नौकरी के नाम 'chief' साथ) के रूप में संदर्भित किया जाता है।
    • यह 'chief' के लिए 'index' 0 को नियुक्त करने के लिए प्रथागत है (वास्तव में, इस तरह tf.distribute.Strategy .वितरण। रणनीति लागू की जाती है)।
  • एक 'task' वर्तमान कार्य की जानकारी प्रदान करता है और प्रत्येक कार्यकर्ता के लिए अलग होता है। यह उस कार्यकर्ता के 'type' और 'index' को निर्दिष्ट करता है।

नीचे एक उदाहरण विन्यास है:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

यहाँ वही TF_CONFIG है जो JSON स्ट्रिंग के रूप में क्रमबद्ध है:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

ध्यान दें कि tf_config पायथन में सिर्फ एक स्थानीय चर है। प्रशिक्षण कॉन्फ़िगरेशन के लिए इसका उपयोग करने में सक्षम होने के लिए, इस निर्देश को JSON के रूप में क्रमबद्ध करने और TF_CONFIG पर्यावरण चर में रखने की आवश्यकता है।

उपरोक्त उदाहरण कॉन्फ़िगरेशन में, आप कार्य 'type' को 'worker' और कार्य 'index' को 0 पर सेट करते हैं। इसलिए, यह मशीन पहला कार्यकर्ता है। इसे 'chief' कार्यकर्ता के रूप में नियुक्त किया जाएगा और दूसरों की तुलना में अधिक काम करेगा।

उदाहरण के लिए, यह ट्यूटोरियल दिखाता है कि आप localhost पर दो श्रमिकों के साथ एक TF_CONFIG चर कैसे सेट कर सकते हैं।

व्यवहार में, आप बाहरी आईपी पते/बंदरगाहों पर कई कर्मचारी बनाएंगे और तदनुसार प्रत्येक कार्यकर्ता पर एक TF_CONFIG चर सेट करेंगे।

इस ट्यूटोरियल में, आप दो श्रमिकों का उपयोग करेंगे:

  • पहले ( 'chief' ) कार्यकर्ता का TF_CONFIG ऊपर दिखाया गया है।
  • दूसरे कार्यकर्ता के लिए, आप tf_config['task']['index']=1 . सेट करेंगे

नोटबुक में पर्यावरण चर और उपप्रक्रियाएं

उपप्रक्रियाओं को अपने माता-पिता से पर्यावरण चर विरासत में मिलते हैं।

उदाहरण के लिए, आप इस जुपिटर नोटबुक प्रक्रिया में निम्नानुसार एक पर्यावरण चर सेट कर सकते हैं:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

फिर, आप उप-प्रक्रियाओं से पर्यावरण चर का उपयोग कर सकते हैं:

echo ${GREETINGS}
Hello TensorFlow!

अगले भाग में, आप कार्यकर्ता उप-प्रक्रियाओं को TF_CONFIG पास करने के लिए एक समान विधि का उपयोग करेंगे। वास्तविक दुनिया के परिदृश्य में, आप अपनी नौकरियों को इस तरह से लॉन्च नहीं करेंगे, लेकिन यह इस उदाहरण में पर्याप्त है।

सही रणनीति चुनें

TensorFlow में, वितरित प्रशिक्षण के दो मुख्य रूप हैं:

  • तुल्यकालिक प्रशिक्षण , जहां प्रशिक्षण के चरण श्रमिकों और प्रतिकृतियों के बीच समन्वयित होते हैं, और
  • अतुल्यकालिक प्रशिक्षण , जहां प्रशिक्षण चरणों को कड़ाई से समन्वयित नहीं किया जाता है (उदाहरण के लिए, पैरामीटर सर्वर प्रशिक्षण )।

यह ट्यूटोरियल दर्शाता है कि tf.distribute.MultiWorkerMirroredStrategy के उदाहरण का उपयोग करके सिंक्रोनस मल्टी-वर्कर ट्रेनिंग कैसे करें।

MultiWorkerMirroredStrategy सभी कर्मचारियों के लिए प्रत्येक डिवाइस पर मॉडल की परतों में सभी चर की प्रतियां बनाता है। यह सामूहिक संचार के लिए एक TensorFlow op, CollectiveOps का उपयोग करता है, ग्रेडिएंट को एकत्रित करने और चर को सिंक में रखने के लिए। इस रणनीति के बारे में tf. tf.distribute.Strategy मार्गदर्शिका में अधिक विवरण हैं।

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
प्लेसहोल्डर17

MultiWorkerMirroredStrategy tf.distribute.experimental.CommunicationOptions पैरामीटर के माध्यम से कई कार्यान्वयन प्रदान करता है: 1) RING क्रॉस-होस्ट संचार परत के रूप में gRPC का उपयोग करते हुए रिंग-आधारित सामूहिकों को लागू करता है; 2) NCCL को लागू करने के लिए एनवीआईडीआईए कलेक्टिव कम्युनिकेशन लाइब्रेरी का उपयोग करता है; और 3) AUTO रनटाइम के लिए पसंद को टाल देता है। सामूहिक कार्यान्वयन का सबसे अच्छा विकल्प GPU की संख्या और प्रकार और क्लस्टर में नेटवर्क इंटरकनेक्ट पर निर्भर करता है।

मॉडल को प्रशिक्षित करें

tf.distribute.Strategy API के tf.keras में एकीकरण के साथ, आप बहु-कार्यकर्ताओं को प्रशिक्षण वितरित करने के लिए केवल एक ही बदलाव करेंगे, वह है मॉडल बिल्डिंग और model.compile() कॉल को strategy.scope() के अंदर कॉल करना। वितरण रणनीति का दायरा तय करता है कि वेरिएबल कैसे और कहां बनाए जाते हैं, और MultiWorkerMirroredStrategy के मामले में, बनाए गए वेरिएबल MirroredVariable वेरिएबल s हैं, और उन्हें प्रत्येक कार्यकर्ता पर दोहराया जाता है।

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

वास्तव में MultiWorkerMirroredStrategy के साथ चलने के लिए आपको कार्यकर्ता प्रक्रियाओं को चलाने और उन्हें TF_CONFIG पास करने की आवश्यकता होगी।

पहले लिखी गई mnist_setup.py फ़ाइल की तरह, यहाँ main.py है जिसे प्रत्येक कार्यकर्ता चलाएगा:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

उपरोक्त कोड स्निपेट में ध्यान दें कि global_batch_size , जो Dataset.batch को पास हो जाता है, per_worker_batch_size * num_workers पर सेट है। यह सुनिश्चित करता है कि प्रत्येक कार्यकर्ता श्रमिकों की संख्या की परवाह किए बिना per_worker_batch_size उदाहरणों के बैचों को संसाधित करता है।

वर्तमान निर्देशिका में अब दोनों पायथन फाइलें हैं:

ls *.py
main.py
mnist_setup.py
प्लेसहोल्डर22

तो TF_CONFIG को json-serialize करें और इसे पर्यावरण चर में जोड़ें:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

अब, आप एक कार्यकर्ता प्रक्रिया शुरू कर सकते हैं जो main.py चलाएगी और main.py का उपयोग TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

उपरोक्त आदेश के बारे में ध्यान देने योग्य कुछ बातें हैं:

  1. यह %%bash का उपयोग करता है जो कुछ बैश कमांड चलाने के लिए एक नोटबुक "मैजिक" है।
  2. यह पृष्ठभूमि में bash प्रक्रिया को चलाने के लिए --bg ध्वज का उपयोग करता है, क्योंकि यह कार्यकर्ता समाप्त नहीं होगा। यह शुरू होने से पहले सभी श्रमिकों की प्रतीक्षा करता है।

बैकग्राउंड वर्कर प्रक्रिया इस नोटबुक के आउटपुट को प्रिंट नहीं करेगी, इसलिए &> इसके आउटपुट को एक फाइल पर रीडायरेक्ट करता है ताकि आप बाद में लॉग फाइल में क्या हुआ, इसका निरीक्षण कर सकें।

इसलिए, प्रक्रिया शुरू होने के लिए कुछ सेकंड प्रतीक्षा करें:

import time
time.sleep(10)

अब, निरीक्षण करें कि कार्यकर्ता की लॉग फ़ाइल में अब तक क्या आउटपुट रहा है:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

लॉग फ़ाइल की अंतिम पंक्ति कहनी चाहिए Started server with target: grpc://localhost:12345 । पहला कार्यकर्ता अब तैयार है, और अन्य सभी कार्यकर्ताओं के आगे बढ़ने के लिए तैयार होने की प्रतीक्षा कर रहा है।

इसलिए tf_config को अपडेट करें ताकि दूसरे कार्यकर्ता को पिक-अप करने की प्रक्रिया मिल सके:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

दूसरा कार्यकर्ता लॉन्च करें। यह प्रशिक्षण शुरू करेगा क्योंकि सभी कार्यकर्ता सक्रिय हैं (इसलिए इस प्रक्रिया को पृष्ठभूमि में रखने की कोई आवश्यकता नहीं है):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

यदि आप पहले कार्यकर्ता द्वारा लिखे गए लॉग को दोबारा जांचते हैं, तो आप सीखेंगे कि उसने उस मॉडल के प्रशिक्षण में भाग लिया था:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

अप्रत्याशित रूप से, यह इस ट्यूटोरियल की शुरुआत में टेस्ट रन की तुलना में धीमी गति से चला।

एक ही मशीन पर कई कर्मचारियों को चलाने से केवल ओवरहेड जुड़ता है।

यहां लक्ष्य प्रशिक्षण समय में सुधार करना नहीं था, बल्कि केवल बहु-कार्यकर्ता प्रशिक्षण का उदाहरण देना था।

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
35 एल10एन-प्लेसहोल्डर
All background processes were killed.

गहराई से बहु-कार्यकर्ता प्रशिक्षण

अब तक, आपने सीखा है कि बुनियादी बहु-कार्यकर्ता सेटअप कैसे किया जाता है।

शेष ट्यूटोरियल के दौरान, आप अन्य कारकों के बारे में जानेंगे, जो वास्तविक उपयोग के मामलों के लिए उपयोगी या महत्वपूर्ण हो सकते हैं, विस्तार से।

डेटासेट शार्डिंग

बहु-कार्यकर्ता प्रशिक्षण में, अभिसरण और प्रदर्शन सुनिश्चित करने के लिए डेटासेट को विभाजित करने की आवश्यकता होती है।

पिछले अनुभाग में उदाहरण tf.distribute.Strategy API द्वारा प्रदान की गई डिफ़ॉल्ट ऑटोशेयरिंग पर निर्भर करता है। आप tf.data.experimental.AutoShardPolicy of tf.data.experimental.DistributeOptions सेट करके tf.data.experimental.AutoShardPolicy को नियंत्रित कर सकते हैं।

ऑटो-शार्डिंग के बारे में अधिक जानने के लिए, वितरित इनपुट मार्गदर्शिका देखें।

ऑटो शार्डिंग को बंद करने का एक त्वरित उदाहरण यहां दिया गया है, ताकि प्रत्येक प्रतिकृति प्रत्येक उदाहरण को संसाधित करे ( अनुशंसित नहीं ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

मूल्यांकन

यदि आप Model.fit validation_data पास करते हैं, तो यह प्रत्येक युग के लिए प्रशिक्षण और मूल्यांकन के बीच वैकल्पिक होगा। validation_data लेने वाला मूल्यांकन श्रमिकों के एक ही समूह में वितरित किया जाता है और मूल्यांकन परिणाम सभी श्रमिकों के लिए एकत्रित और उपलब्ध होते हैं।

प्रशिक्षण के समान, सत्यापन डेटासेट स्वचालित रूप से फ़ाइल स्तर पर शार्प हो जाता है। आपको सत्यापन डेटासेट में वैश्विक बैच आकार सेट करना होगा और validation_steps सेट करना होगा।

मूल्यांकन के लिए दोहराए गए डेटासेट की भी सिफारिश की जाती है।

वैकल्पिक रूप से, आप एक अन्य कार्य भी बना सकते हैं जो समय-समय पर चौकियों को पढ़ता है और मूल्यांकन चलाता है। अनुमानक यही करता है। लेकिन यह मूल्यांकन करने का अनुशंसित तरीका नहीं है और इस प्रकार इसके विवरण छोड़े जाते हैं।

प्रदर्शन

अब आपके पास एक Keras मॉडल है जो MultiWorkerMirroredStrategy के साथ कई कर्मचारियों में चलने के लिए पूरी तरह से तैयार है।

बहु-कार्यकर्ता प्रशिक्षण के प्रदर्शन में सुधार करने के लिए, आप निम्न प्रयास कर सकते हैं:

  • tf.distribute.MultiWorkerMirroredStrategy कई सामूहिक संचार कार्यान्वयन प्रदान करता है:

    • RING क्रॉस-होस्ट संचार परत के रूप में जीआरपीसी का उपयोग करते हुए रिंग-आधारित सामूहिकता को लागू करता है।
    • सामूहिक को लागू करने के लिए NCCL एनवीआईडीआईए कलेक्टिव कम्युनिकेशन लाइब्रेरी का उपयोग करता है।
    • AUTO रनटाइम के लिए पसंद को टाल देता है।

    सामूहिक कार्यान्वयन का सबसे अच्छा विकल्प GPU की संख्या, GPU के प्रकार और क्लस्टर में नेटवर्क इंटरकनेक्ट पर निर्भर करता है। स्वचालित पसंद को ओवरराइड करने के लिए, communication_options के कंस्ट्रक्टर के MultiWorkerMirroredStrategy पैरामीटर निर्दिष्ट करें। उदाहरण के लिए:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • यदि संभव हो तो चर को tf.float पर कास्ट करें:

    • आधिकारिक ResNet मॉडल में एक उदाहरण शामिल है कि यह कैसे किया जा सकता है।

दोष सहिष्णुता

तुल्यकालिक प्रशिक्षण में, यदि कोई एक कर्मचारी विफल हो जाता है और कोई विफलता-पुनर्प्राप्ति तंत्र मौजूद नहीं है, तो क्लस्टर विफल हो जाएगा।

tf.distribute.Strategy के साथ tf.distribute.Strategy का उपयोग करना उन मामलों में गलती सहनशीलता के लाभ के साथ आता है जहां श्रमिक मर जाते हैं या अन्यथा अस्थिर होते हैं। आप अपनी पसंद के वितरित फ़ाइल सिस्टम में प्रशिक्षण स्थिति को संरक्षित करके ऐसा कर सकते हैं, जैसे कि उदाहरण के पुनरारंभ होने पर जो पहले विफल हो गया था या छूट गया था, प्रशिक्षण स्थिति पुनर्प्राप्त हो गई है।

जब कोई कार्यकर्ता अनुपलब्ध हो जाता है, तो अन्य कर्मचारी विफल हो जाएंगे (संभवतः एक समय समाप्त होने के बाद)। ऐसे मामलों में, अनुपलब्ध कार्यकर्ता को पुनः आरंभ करने की आवश्यकता होती है, साथ ही अन्य कर्मचारी जो विफल हो गए हैं।

मॉडलचेकपॉइंट कॉलबैक

ModelCheckpoint कॉलबैक अब दोष सहिष्णुता कार्यक्षमता प्रदान नहीं करता है, कृपया इसके बजाय BackupAndRestore कॉलबैक का उपयोग करें।

ModelCheckpoint कॉलबैक का उपयोग अभी भी चौकियों को बचाने के लिए किया जा सकता है। लेकिन इसके साथ, यदि प्रशिक्षण बाधित हो गया या सफलतापूर्वक समाप्त हो गया, तो चेकपॉइंट से प्रशिक्षण जारी रखने के लिए, उपयोगकर्ता मॉडल को मैन्युअल रूप से लोड करने के लिए जिम्मेदार है।

वैकल्पिक रूप से उपयोगकर्ता मॉडल ModelCheckpoint कॉलबैक के बाहर मॉडल/वजन को सहेजना और पुनर्स्थापित करना चुन सकता है।

मॉडल की बचत और लोडिंग

model.save या tf.saved_model.save का उपयोग करके अपने मॉडल को सहेजने के लिए, प्रत्येक कार्यकर्ता के लिए बचत गंतव्य अलग होना चाहिए।

  • गैर-मुख्य कर्मचारियों के लिए, आपको मॉडल को एक अस्थायी निर्देशिका में सहेजना होगा।
  • प्रमुख के लिए, आपको प्रदान की गई मॉडल निर्देशिका में सहेजना होगा।

एक ही स्थान पर लिखने की कोशिश कर रहे कई श्रमिकों के परिणामस्वरूप त्रुटियों को रोकने के लिए कार्यकर्ता पर अस्थायी निर्देशिकाओं को अद्वितीय होना चाहिए।

सभी निर्देशिकाओं में सहेजा गया मॉडल समान है, और आमतौर पर केवल प्रमुख द्वारा सहेजे गए मॉडल को पुनर्स्थापित करने या सेवा देने के लिए संदर्भित किया जाना चाहिए।

आपके पास कुछ सफाई तर्क होना चाहिए जो आपके प्रशिक्षण के पूरा होने के बाद श्रमिकों द्वारा बनाई गई अस्थायी निर्देशिकाओं को हटा देता है।

एक ही समय में प्रमुख और कार्यकर्ताओं पर बचत करने का कारण यह है कि आप चेकपॉइंटिंग के दौरान चर को एकत्रित कर सकते हैं जिसके लिए प्रमुख और कार्यकर्ता दोनों को कम संचार प्रोटोकॉल में भाग लेने की आवश्यकता होती है। दूसरी ओर, प्रमुख और कार्यकर्ताओं को एक ही मॉडल निर्देशिका में सहेजने की अनुमति देने से विवाद के कारण त्रुटियां होंगी।

MultiWorkerMirroredStrategy का उपयोग करते हुए, प्रोग्राम प्रत्येक कार्यकर्ता पर चलाया जाता है, और यह जानने के लिए कि क्या वर्तमान कार्यकर्ता प्रमुख है, यह क्लस्टर रिज़ॉल्वर ऑब्जेक्ट का लाभ उठाता है जिसमें विशेषताएँ task_type और task_id :

  • task_type आपको बताता है कि वर्तमान नौकरी क्या है (उदाहरण के लिए 'worker' )।
  • task_id आपको कार्यकर्ता का पहचानकर्ता बताता है।
  • task_id == 0 वाले कार्यकर्ता को मुख्य कार्यकर्ता के रूप में नामित किया गया है।

नीचे दिए गए कोड स्निपेट में, write_filepath फ़ंक्शन लिखने के लिए फ़ाइल पथ प्रदान करता है, जो कार्यकर्ता के task_id पर निर्भर करता है:

  • मुख्य कार्यकर्ता के लिए ( task_id == 0 के साथ), यह मूल फ़ाइल पथ को लिखता है।
  • अन्य श्रमिकों के लिए, यह एक अस्थायी निर्देशिका बनाता temp_dir - जिसमें लिखने के लिए निर्देशिका पथ में task_id है:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

इसके साथ, अब आप सहेजने के लिए तैयार हैं:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

जैसा कि ऊपर वर्णित है, बाद में मॉडल को केवल उस पथ प्रमुख से लोड किया जाना चाहिए जिस पर सहेजा गया है, इसलिए गैर-मुख्य श्रमिकों द्वारा सहेजे गए अस्थायी लोगों को हटा दें:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

अब, जब लोड करने का समय आता है, तो चलिए सुविधाजनक tf.keras.models.load_model API का उपयोग करते हैं, और आगे के काम को जारी रखते हैं।

यहां, मान लें कि प्रशिक्षण को लोड करने और जारी रखने के लिए केवल एकल कार्यकर्ता का उपयोग किया जाता है, जिस स्थिति में आप tf.keras.models.load_model को किसी अन्य strategy.scope() (ध्यान दें कि strategy = tf.distribute.MultiWorkerMirroredStrategy() , जैसा कि पहले परिभाषित किया गया है। ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

चेकपॉइंट को सहेजना और बहाल करना

दूसरी ओर, चेकपॉइंटिंग आपको पूरे मॉडल को सहेजे बिना अपने मॉडल के वजन को बचाने और उन्हें पुनर्स्थापित करने की अनुमति देता है।

यहां, आप एक tf.train.Checkpoint जो मॉडल को ट्रैक करता है, जिसे tf.train.CheckpointManager द्वारा प्रबंधित किया जाता है, ताकि केवल नवीनतम चेकपॉइंट संरक्षित रहे:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

एक बार CheckpointManager सेट हो जाने के बाद, अब आप उन चौकियों को सहेजने और निकालने के लिए तैयार हैं जिन्हें गैर-मुख्य कर्मचारियों ने सहेजा था:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

अब, जब आपको मॉडल को पुनर्स्थापित करने की आवश्यकता होती है, तो आप सुविधाजनक tf.train.latest_checkpoint फ़ंक्शन का उपयोग करके सहेजे गए नवीनतम चेकपॉइंट को पा सकते हैं। चौकी को बहाल करने के बाद, आप प्रशिक्षण जारी रख सकते हैं।

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

बैकअप और पुनर्स्थापना कॉलबैक

tf.keras.callbacks.BackupAndRestore कॉलबैक backup_dir तर्क के तहत एक अस्थायी चेकपॉइंट फ़ाइल में बैकअप BackupAndRestore रिस्टोर के लिए मॉडल और वर्तमान युग संख्या का बैकअप लेकर दोष सहिष्णुता कार्यक्षमता प्रदान करता है। यह प्रत्येक युग के अंत में किया जाता है।

एक बार जब नौकरी बाधित हो जाती है और फिर से शुरू हो जाती है, तो कॉलबैक अंतिम चेकपॉइंट को पुनर्स्थापित कर देता है, और बाधित युग की शुरुआत से प्रशिक्षण जारी रहता है। रुकावट से पहले अधूरे युग में पहले से किए गए किसी भी आंशिक प्रशिक्षण को फेंक दिया जाएगा, ताकि यह अंतिम मॉडल स्थिति को प्रभावित न करे।

इसका उपयोग करने के लिए, Model.fit कॉल पर tf.keras.callbacks.BackupAndRestore का एक उदाहरण प्रदान करें।

MultiWorkerMirroredStrategy के साथ, यदि कोई कार्यकर्ता बाधित हो जाता है, तो पूरा क्लस्टर तब तक रुक जाता है जब तक कि बाधित कार्यकर्ता फिर से शुरू नहीं हो जाता। अन्य कार्यकर्ता भी पुनः आरंभ करेंगे, और बाधित कार्यकर्ता क्लस्टर में फिर से शामिल हो जाएगा। फिर, प्रत्येक कार्यकर्ता चेकपॉइंट फ़ाइल को पढ़ता है जिसे पहले सहेजा गया था और अपनी पूर्व स्थिति को चुनता है, जिससे क्लस्टर को सिंक में वापस आने की अनुमति मिलती है। फिर, प्रशिक्षण जारी है।

BackupAndRestore कॉलबैक प्रशिक्षण स्थिति को बचाने और पुनर्स्थापित करने के लिए CheckpointManager का उपयोग करता है, जो चेकपॉइंट नामक एक फ़ाइल उत्पन्न करता है जो मौजूदा चेकपॉइंट्स को नवीनतम के साथ ट्रैक करता है। इस कारण से, नाम टकराव से बचने के लिए अन्य चौकियों को संग्रहीत करने के लिए backup_dir का पुन: उपयोग नहीं किया जाना चाहिए।

वर्तमान में, BackupAndRestore कॉलबैक बिना किसी रणनीति के एकल-कार्यकर्ता प्रशिक्षण का समर्थन करता है- MirroredStrategy -और मल्टी-वर्कर MultiWorkerMirroredStrategy के साथ बहु-कार्यकर्ता प्रशिक्षण।

बहु-कार्यकर्ता प्रशिक्षण और एकल-कार्यकर्ता प्रशिक्षण दोनों के लिए नीचे दो उदाहरण दिए गए हैं:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

यदि आप backup_dir की निर्देशिका का निरीक्षण करते हैं जिसे आपने बैकअप एंड BackupAndRestore में निर्दिष्ट किया है, तो आप कुछ अस्थायी रूप से जेनरेट की गई चेकपॉइंट फाइलें देख सकते हैं। पहले खोए हुए उदाहरणों को पुनर्प्राप्त करने के लिए उन फ़ाइलों की आवश्यकता होती है, और आपके प्रशिक्षण के सफलतापूर्वक बाहर निकलने पर Model.fit के अंत में लाइब्रेरी द्वारा हटा दिया जाएगा।

अतिरिक्त संसाधन

  1. TensorFlow गाइड में वितरित प्रशिक्षण उपलब्ध वितरण रणनीतियों का अवलोकन प्रदान करता है।
  2. Keras और MultiWorkerMirroredStrategy ट्यूटोरियल के साथ कस्टम ट्रेनिंग लूप दिखाता है कि MultiWorkerMirroredStrategy और एक कस्टम ट्रेनिंग लूप का उपयोग कैसे करें।
  3. आधिकारिक मॉडल देखें, जिनमें से कई को कई वितरण रणनीतियों को चलाने के लिए कॉन्फ़िगर किया जा सकता है।
  4. tf.function गाइड के साथ बेहतर प्रदर्शन अन्य रणनीतियों और उपकरणों के बारे में जानकारी प्रदान करता है, जैसे कि TensorFlow Profiler जिसका उपयोग आप अपने TensorFlow मॉडल के प्रदर्शन को अनुकूलित करने के लिए कर सकते हैं।