ParameterServerStrategy के साथ पैरामीटर सर्वर प्रशिक्षण

TensorFlow.org पर देखें Google Colab में चलाएं GitHub पर स्रोत देखें नोटबुक डाउनलोड करें

अवलोकन

कई मशीनों पर मॉडल प्रशिक्षण को बढ़ाने के लिए पैरामीटर सर्वर प्रशिक्षण एक सामान्य डेटा-समानांतर विधि है।

एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर में श्रमिक और पैरामीटर सर्वर होते हैं। चर पैरामीटर सर्वर पर बनाए जाते हैं और उन्हें प्रत्येक चरण में श्रमिकों द्वारा पढ़ा और अद्यतन किया जाता है। डिफ़ॉल्ट रूप से, कार्यकर्ता इन चरों को एक दूसरे के साथ सिंक्रनाइज़ किए बिना स्वतंत्र रूप से पढ़ते और अद्यतन करते हैं। यही कारण है कि कभी-कभी पैरामीटर सर्वर-शैली प्रशिक्षण को अतुल्यकालिक प्रशिक्षण कहा जाता है।

TensorFlow 2 में, पैरामीटर सर्वर प्रशिक्षण tf.distribute.experimental.ParameterServerStrategy वर्ग द्वारा संचालित होता है, जो प्रशिक्षण चरणों को एक क्लस्टर में वितरित करता है जो हजारों श्रमिकों (पैरामीटर सर्वर के साथ) तक स्केल करता है।

समर्थित प्रशिक्षण विधियां

दो मुख्य समर्थित प्रशिक्षण विधियां हैं:

नौकरियों और कार्यों के साथ एक समूह

पसंद के एपीआई ( Model.fit या एक कस्टम ट्रेनिंग लूप) के बावजूद, TensorFlow 2 में वितरित प्रशिक्षण में शामिल हैं: कई 'jobs' के साथ एक 'cluster' , और प्रत्येक जॉब में एक या अधिक 'tasks' हो सकते हैं।

पैरामीटर सर्वर प्रशिक्षण का उपयोग करते समय, इसकी अनुशंसा की जाती है:

  • एक समन्वयक की नौकरी (जिसका नाम chief है)
  • एकाधिक कार्यकर्ता नौकरियां (नौकरी का नाम worker ); तथा
  • एकाधिक पैरामीटर सर्वर कार्य (नौकरी का नाम ps )

जबकि समन्वयक संसाधन बनाता है, प्रशिक्षण कार्यों को भेजता है, चौकियों को लिखता है, और कार्य विफलताओं से निपटता है, कार्यकर्ता और पैरामीटर सर्वर tf.distribute.Server चलाते हैं जो समन्वयक के अनुरोधों को सुनते हैं।

Model.fit API के साथ पैरामीटर सर्वर प्रशिक्षण

Model.fit API के साथ पैरामीटर सर्वर प्रशिक्षण के लिए समन्वयक को इनपुट के रूप में tf.distribute.experimental.ParameterServerStrategy ऑब्जेक्ट और एक tf.keras.utils.experimental.DatasetCreator का उपयोग करने की आवश्यकता होती है। बिना किसी रणनीति के Model.fit उपयोग के समान, या अन्य रणनीतियों के साथ, वर्कफ़्लो में मॉडल बनाना और संकलित करना, कॉलबैक तैयार करना, उसके बाद Model.fit कॉल करना शामिल है।

कस्टम प्रशिक्षण लूप के साथ पैरामीटर सर्वर प्रशिक्षण

कस्टम प्रशिक्षण लूप के साथ, tf.distribute.experimental.coordinator.ClusterCoordinator वर्ग समन्वयक के लिए उपयोग किया जाने वाला प्रमुख घटक है।

  • ClusterCoordinator वर्ग को tf.distribute.Strategy ऑब्जेक्ट के साथ मिलकर काम करने की आवश्यकता है।
  • क्लस्टर की जानकारी प्रदान करने के लिए इस tf.distribute.Strategy ऑब्जेक्ट की आवश्यकता होती है और इसका उपयोग प्रशिक्षण चरण को परिभाषित करने के लिए किया जाता है, जैसा कि tf.distribute.Strategy के साथ कस्टम प्रशिक्षण में दिखाया गया है।
  • ClusterCoordinator ऑब्जेक्ट फिर इन प्रशिक्षण चरणों के निष्पादन को दूरस्थ श्रमिकों को भेजता है।
  • पैरामीटर सर्वर प्रशिक्षण के लिए, क्लस्टर tf.distribute.experimental.ParameterServerStrategy को ClusterCoordinator के साथ काम करने की आवश्यकता है।

ClusterCoordinator ऑब्जेक्ट द्वारा प्रदान किया गया सबसे महत्वपूर्ण API schedule है:

  • schedule एपीआई एक tf.function को एनक्यू करता है और भविष्य जैसा RemoteValue तुरंत लौटाता है।
  • कतारबद्ध कार्यों को पृष्ठभूमि थ्रेड में दूरस्थ श्रमिकों को भेजा जाएगा और उनके RemoteValue s को एसिंक्रोनस रूप से भर दिया जाएगा।
  • चूंकि schedule में वर्कर असाइनमेंट की आवश्यकता नहीं होती है, इसलिए पास किए गए tf.function को किसी भी उपलब्ध वर्कर पर निष्पादित किया जा सकता है।
  • यदि जिस कार्यकर्ता पर इसे निष्पादित किया गया है, वह पूरा होने से पहले अनुपलब्ध हो जाता है, तो फ़ंक्शन को किसी अन्य उपलब्ध कार्यकर्ता पर पुनः प्रयास किया जाएगा।
  • इस तथ्य और इस तथ्य के कारण कि फ़ंक्शन निष्पादन परमाणु नहीं है, एक फ़ंक्शन को एक से अधिक बार निष्पादित किया जा सकता है।

दूरस्थ कार्यों को भेजने के अलावा, ClusterCoordinator सभी श्रमिकों पर डेटासेट बनाने में भी मदद करता है और जब कोई कार्यकर्ता विफलता से ठीक हो जाता है तो इन डेटासेट का पुनर्निर्माण करता है।

ट्यूटोरियल सेटअप

ट्यूटोरियल Model.fit और कस्टम ट्रेनिंग लूप पथों में विभाजित होगा, और आप अपनी आवश्यकताओं के अनुरूप एक चुन सकते हैं। "एक्स के साथ प्रशिक्षण" के अलावा अन्य अनुभाग दोनों पथों पर लागू होते हैं।

pip install portpicker

क्लस्टर सेटअप

जैसा कि ऊपर उल्लेख किया गया है, एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर को एक समन्वयक कार्य की आवश्यकता होती है जो आपके प्रशिक्षण कार्यक्रम को चलाता है, एक या कई कार्यकर्ता और पैरामीटर सर्वर कार्य जो TensorFlow सर्वर चलाते हैं- tf.distribute.Server और संभवतः एक अतिरिक्त मूल्यांकन कार्य जो साइड-कार मूल्यांकन चलाता है (नीचे साइड-कार मूल्यांकन अनुभाग देखें)। उन्हें स्थापित करने की आवश्यकताएं हैं:

  • समन्वयक कार्य को मूल्यांकनकर्ता को छोड़कर अन्य सभी TensorFlow सर्वरों के पते और बंदरगाहों को जानना होगा।
  • श्रमिकों और पैरामीटर सर्वरों को यह जानने की जरूरत है कि उन्हें किस पोर्ट को सुनना है। सादगी के लिए, आप आमतौर पर इन कार्यों पर TensorFlow सर्वर बनाते समय पूरी क्लस्टर जानकारी पास कर सकते हैं।
  • मूल्यांकनकर्ता कार्य को प्रशिक्षण क्लस्टर के सेटअप को जानने की आवश्यकता नहीं है। यदि ऐसा होता है, तो उसे प्रशिक्षण क्लस्टर से जुड़ने का प्रयास नहीं करना चाहिए।
  • श्रमिकों और पैरामीटर सर्वरों में क्रमशः "worker" और "ps" के रूप में कार्य प्रकार होने चाहिए। समन्वयक को विरासत कारणों से कार्य प्रकार के रूप में "chief" का उपयोग करना चाहिए।

इस ट्यूटोरियल में, आप एक इन-प्रोसेस क्लस्टर बनाएंगे ताकि संपूर्ण पैरामीटर सर्वर प्रशिक्षण कोलाब में चलाया जा सके। आप बाद के अनुभाग में वास्तविक समूहों को स्थापित करने का तरीका जानेंगे।

इन-प्रोसेस क्लस्टर

आप पहले से कई TensorFlow सर्वर बनाकर शुरू करेंगे और बाद में उनसे जुड़ेंगे। ध्यान दें कि यह केवल इस ट्यूटोरियल के प्रदर्शन के उद्देश्य के लिए है, और वास्तविक प्रशिक्षण में सर्वर "worker" और "ps" मशीनों पर शुरू किए जाएंगे।

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

इन-प्रोसेस क्लस्टर सेटअप अक्सर यूनिट परीक्षण में उपयोग किया जाता है, जैसे कि यहां

स्थानीय परीक्षण के लिए एक अन्य विकल्प स्थानीय मशीन पर प्रक्रियाओं को लॉन्च करना है - इस दृष्टिकोण के उदाहरण के लिए केरस के साथ बहु-कार्यकर्ता प्रशिक्षण देखें।

एक ParameterServerStrategy को तुरंत चालू करें

इससे पहले कि आप प्रशिक्षण कोड में गोता लगाएँ, आइए एक ParameterServerStrategy ऑब्जेक्ट को इंस्टेंट करें। ध्यान दें कि यह आवश्यक है, भले ही आप Model.fit या कस्टम प्रशिक्षण लूप के साथ आगे बढ़ रहे हों। वेरिएबल_पार्टिशनर तर्क को variable_partitioner शार्डिंग सेक्शन में समझाया जाएगा।

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

प्रशिक्षण के लिए GPU का उपयोग करने के लिए, प्रत्येक कार्यकर्ता को दृश्यमान GPU आवंटित करें। ParameterServerStrategy प्रत्येक कार्यकर्ता पर सभी उपलब्ध GPU का उपयोग करेगा, इस प्रतिबंध के साथ कि सभी श्रमिकों के पास समान संख्या में GPU उपलब्ध होने चाहिए।

वेरिएबल शार्डिंग

वेरिएबल शार्पनिंग एक वेरिएबल को कई छोटे वेरिएबल में विभाजित करने के लिए संदर्भित करता है, जिसे शार्ड्स कहा जाता है। इन शार्क तक पहुँचने पर नेटवर्क लोड को वितरित करने के लिए वेरिएबल शार्डिंग उपयोगी हो सकती है। यह कई पैरामीटर सर्वरों में एक सामान्य चर की गणना और भंडारण को वितरित करने के लिए भी उपयोगी है।

वैरिएबल शार्डिंग को सक्षम करने के लिए, आप एक ParameterServerStrategy ऑब्जेक्ट का निर्माण करते समय एक variable_partitioner में पास कर सकते हैं। वेरिएबल_पार्टिशनर को हर बार लागू किया जाएगा जब एक variable_partitioner बनाया जाएगा और यह वेरिएबल के प्रत्येक आयाम के साथ शार्क की संख्या को वापस करने की उम्मीद है। कुछ आउट-ऑफ़-बॉक्स variable_partitioner s प्रदान किए जाते हैं जैसे tf.distribute.experimental.partitioners.MinSizePartitioner । छोटे चरों को विभाजित करने से बचने के लिए आकार-आधारित विभाजनकर्ताओं जैसे tf.distribute.experimental.partitioners.MinSizePartitioner का उपयोग करने की अनुशंसा की जाती है, जो मॉडल प्रशिक्षण गति पर नकारात्मक प्रभाव डाल सकता है।

जब एक variable_partitioner पास किया जाता है और यदि आप सीधे strategy.scope() के तहत एक वेरिएबल बनाते हैं, तो यह एक variables प्रॉपर्टी के साथ एक कंटेनर टाइप बन जाएगा जो शार्क की सूची तक पहुंच प्रदान करता है। ज्यादातर मामलों में, यह कंटेनर सभी शार्क को जोड़कर स्वचालित रूप से एक टेंसर में परिवर्तित हो जाएगा। नतीजतन, इसे एक सामान्य चर के रूप में इस्तेमाल किया जा सकता है। दूसरी ओर, कुछ TensorFlow विधियाँ जैसे tf.nn.embedding_lookup इस कंटेनर प्रकार के लिए कुशल कार्यान्वयन प्रदान करती हैं और इन विधियों में स्वचालित संयोजन से बचा जाएगा।

अधिक विवरण के लिए कृपया tf.distribute.experimental.ParameterServerStrategy के API दस्तावेज़ देखें।

Model.fit के साथ प्रशिक्षण

Model.fit के माध्यम से एक उपयोग में आसान प्रशिक्षण एपीआई प्रदान करता है जो हुड के नीचे प्रशिक्षण लूप को संभालता है, train_step के लचीलेपन के साथ, और कॉलबैक, जो TensorBoard के लिए चेकपॉइंट सेविंग या सारांश बचत जैसी कार्यक्षमता प्रदान करता है। Model.fit के साथ, रणनीति ऑब्जेक्ट के सरल स्वैप के साथ अन्य रणनीतियों के लिए समान प्रशिक्षण कोड का उपयोग किया जा सकता है।

इनपुट डेटा

Model.fit पैरामीटर सर्वर प्रशिक्षण के साथ आवश्यक है कि इनपुट डेटा एक कॉल करने योग्य में प्रदान किया जाए जो tf.distribute.InputContext प्रकार का एक तर्क लेता है, और एक tf.data.Dataset देता है। फिर, एक tf.keras.utils.experimental.DatasetCreator ऑब्जेक्ट बनाएं जो इस तरह के कॉल करने callable , और एक वैकल्पिक tf.distribute.InputOptions ऑब्जेक्ट को input_options तर्क के माध्यम से लेता है।

ध्यान दें कि पैरामीटर सर्वर प्रशिक्षण के साथ डेटा को फेरबदल और दोहराने की सिफारिश की जाती है, और fit कॉल में steps_per_epoch निर्दिष्ट करें ताकि पुस्तकालय युग की सीमाओं को जान सके।

इनपुट InputContext तर्क के बारे में अधिक जानकारी के लिए कृपया वितरित इनपुट ट्यूटोरियल देखें।

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

dataset_fn में कोड इनपुट डिवाइस पर लागू किया जाएगा, जो आमतौर पर सीपीयू है, प्रत्येक कार्यकर्ता मशीन पर।

मॉडल निर्माण और संकलन

अब, आप एक tf.keras.Model एक तुच्छ tf.keras.models.Sequential .प्रदर्शन उद्देश्यों के लिए अनुक्रमिक मॉडल-उसके बाद एक Model.compile कॉल को घटकों को शामिल करने के लिए बनाएंगे, जैसे कि एक ऑप्टिमाइज़र, मेट्रिक्स, या पैरामीटर जैसे steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

कॉलबैक और प्रशिक्षण

वास्तविक प्रशिक्षण के लिए model.fit को कॉल करने से पहले, आइए सामान्य कार्यों के लिए आवश्यक कॉलबैक तैयार करें, जैसे:

  • ModelCheckpoint : मॉडल के वजन को बचाने के लिए।
  • BackupAndRestore : यह सुनिश्चित करने के लिए कि प्रशिक्षण की प्रगति का स्वचालित रूप से बैकअप लिया जाता है, और यदि क्लस्टर अनुपलब्धता (जैसे गर्भपात या छूट) का अनुभव करता है, तो उसे पुनर्प्राप्त किया जाता है; या
  • TensorBoard : प्रगति रिपोर्ट को सारांश फ़ाइलों में सहेजने के लिए, जो TensorBoard टूल में विज़ुअलाइज़ हो जाती हैं।
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

ClusterCoordinator के साथ प्रत्यक्ष उपयोग (वैकल्पिक)

यहां तक ​​कि अगर आप Model.fit प्रशिक्षण पथ चुनते हैं, तो आप वैकल्पिक रूप से एक tf.distribute.experimental.coordinator.ClusterCoordinator ऑब्जेक्ट को अन्य कार्यों को शेड्यूल करने के लिए तत्काल कर सकते हैं जिन्हें आप श्रमिकों पर निष्पादित करना चाहते हैं। अधिक विवरण और उदाहरणों के लिए एक कस्टम प्रशिक्षण लूप अनुभाग के साथ प्रशिक्षण देखें।

एक कस्टम प्रशिक्षण लूप के साथ प्रशिक्षण

tf.distribute.Strategy के साथ कस्टम प्रशिक्षण लूप का उपयोग करना प्रशिक्षण लूप को परिभाषित करने के लिए बहुत लचीलापन प्रदान करता है। ऊपर परिभाषित ParameterServerStrategy ( strategy के रूप में) के साथ, आप दूरस्थ श्रमिकों को प्रशिक्षण चरणों के निष्पादन को भेजने के लिए एक tf.distribute.experimental.coordinator.ClusterCoordinator का उपयोग करेंगे।

फिर, आप एक मॉडल बनाएंगे, एक डेटासेट और एक चरण फ़ंक्शन को परिभाषित करेंगे, जैसा कि आपने अन्य tf.distribute.Strategy s के साथ प्रशिक्षण लूप में किया है। आप tf.distribute.Strategy ट्यूटोरियल के साथ कस्टम प्रशिक्षण में अधिक विवरण प्राप्त कर सकते हैं।

कुशल डेटासेट प्रीफ़ेचिंग सुनिश्चित करने के लिए, डिस्पैच ट्रेनिंग स्टेप्स टू रिमोट वर्कर्स सेक्शन में उल्लिखित अनुशंसित वितरित डेटासेट निर्माण एपीआई का उपयोग करें। साथ ही, कामगारों को आवंटित GPU का पूरा लाभ उठाने के लिए worker_fn के अंदर Strategy.run को कॉल करना सुनिश्चित करें। शेष चरण GPU के साथ या उसके बिना प्रशिक्षण के लिए समान हैं।

आइए इन घटकों को निम्नलिखित चरणों में बनाएं:

डेटा सेट करें

सबसे पहले, एक ऐसा फ़ंक्शन लिखें जो एक डेटासेट बनाता है जिसमें केरस प्रीप्रोसेसिंग लेयर्स द्वारा कार्यान्वित प्रीप्रोसेसिंग लॉजिक शामिल होता है।

आप इन परतों को dataset_fn dataset_fn अंदर परिवर्तन लागू करेंगे, क्योंकि आप dataset_fn को एक tf.function में tf.function , जो इसके अंदर चर बनाने की अनुमति नहीं देता है।

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

डेटासेट में खिलौना उदाहरण जेनरेट करें:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

फिर, dataset_fn में लिपटे प्रशिक्षण डेटासेट बनाएं:

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

मॉडल बनाएं

इसके बाद, मॉडल और अन्य ऑब्जेक्ट बनाएं। strategy.scope के तहत सभी वेरिएबल बनाना सुनिश्चित करें।

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

आइए पुष्टि करें कि FixedShardsPartitioner के उपयोग ने सभी चरों को दो शार्प में विभाजित किया है और प्रत्येक शार्प को विभिन्न पैरामीटर सर्वरों को सौंपा गया है:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

प्रशिक्षण चरण को परिभाषित करें

तीसरा, tf.function में लिपटे प्रशिक्षण चरण बनाएं:

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

उपरोक्त प्रशिक्षण चरण समारोह में, step_fn में Strategy.run और Strategy.reduce को कॉल करना प्रति कार्यकर्ता कई GPU का समर्थन कर सकता है। यदि श्रमिकों के पास GPU आवंटित किया गया है, तो Strategy.run डेटासेट को कई प्रतिकृतियों पर वितरित करेगा।

दूरस्थ श्रमिकों को प्रशिक्षण चरण भेजें

ParameterServerStrategy द्वारा सभी गणनाओं को परिभाषित करने के बाद, आप tf.distribute.experimental.coordinator.ClusterCoordinator वर्ग का उपयोग संसाधन बनाने और दूरस्थ श्रमिकों को प्रशिक्षण चरणों को वितरित करने के लिए करेंगे।

आइए पहले ClusterCoordinator ऑब्जेक्ट बनाएं और स्ट्रैटेजी ऑब्जेक्ट में पास करें:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

फिर, प्रति-कार्यकर्ता डेटासेट और एक पुनरावर्तक बनाएं। नीचे दिए गए per_worker_dataset_fn में, dataset_fn को strategy.distribute_datasets_from_function में लपेटने की अनुशंसा की जाती है ताकि GPU को बिना किसी बाधा के प्रभावी प्रीफ़ेच किया जा सके।

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

अंतिम चरण ClusterCoordinator.schedule का उपयोग करके दूरस्थ श्रमिकों को गणना वितरित करना है:

  • schedule मेथड एक tf.फंक्शन को एनक्यू करता है और फ्यूचर जैसा tf.function तुरंत RemoteValue है। कतारबद्ध कार्यों को पृष्ठभूमि थ्रेड्स में दूरस्थ श्रमिकों को भेजा जाएगा और RemoteValue को अतुल्यकालिक रूप से भरा जाएगा।
  • join की विधि ( ClusterCoordinator.join ) का उपयोग तब तक प्रतीक्षा करने के लिए किया जा सकता है जब तक कि सभी निर्धारित कार्य निष्पादित नहीं हो जाते।
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

यहां बताया गया है कि आप RemoteValue का परिणाम कैसे प्राप्त कर सकते हैं:

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

वैकल्पिक रूप से, आप सभी चरणों को लॉन्च कर सकते हैं और पूरा होने की प्रतीक्षा करते हुए कुछ कर सकते हैं:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

इस विशेष उदाहरण के लिए संपूर्ण प्रशिक्षण और सर्विंग वर्कफ़्लो के लिए, कृपया यह परीक्षण देखें।

डेटासेट निर्माण के बारे में अधिक

उपरोक्त कोड में डेटासेट ClusterCoordinator.create_per_worker_dataset API का उपयोग करके बनाया गया है)। यह प्रति कार्यकर्ता एक डेटासेट बनाता है और एक कंटेनर ऑब्जेक्ट देता है। प्रति-कार्यकर्ता पुनरावर्तक बनाने के लिए आप उस पर iter विधि को कॉल कर सकते हैं। प्रति-कार्यकर्ता पुनरावर्तक में प्रति कार्यकर्ता एक पुनरावर्तक होता है और किसी विशेष कार्यकर्ता पर फ़ंक्शन निष्पादित होने से पहले एक कार्यकर्ता के संबंधित स्लाइस को ClusterCoordinator.schedule विधि में पारित फ़ंक्शन के इनपुट तर्क में प्रतिस्थापित किया जाएगा।

वर्तमान में, ClusterCoordinator.schedule विधि मानती है कि कार्यकर्ता समकक्ष हैं और इस प्रकार यह मानता है कि विभिन्न श्रमिकों के डेटासेट समान हैं, सिवाय इसके कि यदि उनमें Dataset.shuffle ऑपरेशन होता है तो उन्हें अलग तरीके से फेरबदल किया जा सकता है। इस वजह से, यह भी अनुशंसा की जाती है कि डेटासेट को अनिश्चित काल तक दोहराया जाए और आप डेटासेट से OutOfRangeError पर भरोसा करने के बजाय चरणों की एक सीमित संख्या शेड्यूल करें।

एक अन्य महत्वपूर्ण नोट यह है कि tf.data डेटासेट कार्य सीमाओं में निहित क्रमांकन और अक्रमांकन का समर्थन नहीं करता है। इसलिए ClusterCoordinator.create_per_worker_dataset को दिए गए फ़ंक्शन के अंदर संपूर्ण डेटासेट बनाना महत्वपूर्ण है।

मूल्यांकन

वितरित प्रशिक्षण में मूल्यांकन लूप को परिभाषित करने और चलाने के एक से अधिक तरीके हैं। नीचे वर्णित अनुसार प्रत्येक के अपने फायदे और नुकसान हैं। यदि आपकी कोई प्राथमिकता नहीं है, तो इनलाइन मूल्यांकन पद्धति की अनुशंसा की जाती है।

इनलाइन मूल्यांकन

इस पद्धति में, समन्वयक प्रशिक्षण और मूल्यांकन के बीच वैकल्पिक करता है और इस प्रकार इसे इनलाइन मूल्यांकन कहा जाता है।

इनलाइन मूल्यांकन के कई लाभ हैं। उदाहरण के लिए:

  • यह बड़े मूल्यांकन मॉडल और मूल्यांकन डेटासेट का समर्थन कर सकता है जो एक एकल कार्य नहीं कर सकता है।
  • अगले युग के प्रशिक्षण के लिए निर्णय लेने के लिए मूल्यांकन परिणामों का उपयोग किया जा सकता है।

इनलाइन मूल्यांकन को लागू करने के दो तरीके हैं: प्रत्यक्ष मूल्यांकन और वितरित मूल्यांकन।

  • प्रत्यक्ष मूल्यांकन : छोटे मॉडल और मूल्यांकन डेटासेट के लिए, समन्वयक समन्वयक पर मूल्यांकन डेटासेट के साथ सीधे वितरित मॉडल पर मूल्यांकन चला सकता है:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
प्लेसहोल्डर26
  • वितरित मूल्यांकन : बड़े मॉडल या डेटासेट के लिए जो सीधे समन्वयक पर चलाना संभव नहीं है, समन्वयक कार्य ClusterCoordinator.schedule / ClusterCoordinator.join विधियों के माध्यम से श्रमिकों को मूल्यांकन कार्यों को वितरित कर सकता है:
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

साइड-कार मूल्यांकन

एक अन्य विधि को साइड-कार मूल्यांकन कहा जाता है जहां आप एक समर्पित मूल्यांकनकर्ता कार्य बनाते हैं जो बार-बार चौकियों को पढ़ता है और नवीनतम चेकपॉइंट पर मूल्यांकन चलाता है। यह आपके प्रशिक्षण कार्यक्रम को जल्दी समाप्त करने की अनुमति देता है यदि आपको मूल्यांकन परिणामों के आधार पर अपने प्रशिक्षण लूप को बदलने की आवश्यकता नहीं है। हालांकि, मूल्यांकन को ट्रिगर करने के लिए अतिरिक्त मूल्यांकनकर्ता कार्य और आवधिक जांच की आवश्यकता होती है। एक संभावित साइड-कार मूल्यांकन लूप निम्नलिखित है:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

वास्तविक दुनिया में क्लस्टर

एक वास्तविक उत्पादन वातावरण में, आप विभिन्न मशीनों पर विभिन्न प्रक्रियाओं में सभी कार्यों को चलाएंगे। प्रत्येक कार्य पर क्लस्टर जानकारी को कॉन्फ़िगर करने का सबसे सरल तरीका है "TF_CONFIG" पर्यावरण चर सेट करना और "TF_CONFIG" को पार्स करने के लिए tf.distribute.cluster_resolver.TFConfigClusterResolver का उपयोग करना।

"TF_CONFIG" पर्यावरण चर के बारे में सामान्य विवरण के लिए, वितरित प्रशिक्षण मार्गदर्शिका देखें।

यदि आप कुबेरनेट्स या अन्य कॉन्फ़िगरेशन टेम्प्लेट का उपयोग करके अपना प्रशिक्षण कार्य शुरू करते हैं, तो बहुत संभावना है कि ये टेम्प्लेट आपके लिए पहले से ही “TF_CONFIG" सेट कर चुके हों।

"TF_CONFIG" पर्यावरण चर सेट करें

मान लें कि आपके पास 3 कर्मचारी और 2 पैरामीटर सर्वर हैं, तो कार्यकर्ता 1 का "TF_CONFIG" हो सकता है:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

मूल्यांकनकर्ता का "TF_CONFIG" हो सकता है:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

मूल्यांकनकर्ता के लिए उपरोक्त "TF_CONFIG" स्ट्रिंग में "cluster" भाग वैकल्पिक है।

यदि आप सभी कार्यों के लिए एक ही बाइनरी का उपयोग करते हैं

यदि आप इन सभी कार्यों को एक बाइनरी का उपयोग करके चलाना पसंद करते हैं, तो आपको शुरुआत में ही अपने प्रोग्राम शाखा को विभिन्न भूमिकाओं में जाने देना होगा:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

निम्न कोड एक TensorFlow सर्वर शुरू करता है और प्रतीक्षा करता है:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

कार्य विफलता को संभालना

कार्यकर्ता विफलता

tf.distribute.experimental.coordinator.ClusterCoordinator या Model.fit कार्यकर्ता विफलता के लिए अंतर्निहित दोष सहिष्णुता प्रदान करते हैं। कार्यकर्ता पुनर्प्राप्ति पर, पहले प्रदान किया गया डेटासेट फ़ंक्शन (या तो ClusterCoordinator.create_per_worker_dataset एक कस्टम प्रशिक्षण लूप के लिए, या tf.keras.utils.experimental.DatasetCreator for Model.fit ) डेटासेट को फिर से बनाने के लिए श्रमिकों पर लागू किया जाएगा।

पैरामीटर सर्वर या समन्वयक विफलता

हालांकि, जब समन्वयक पैरामीटर सर्वर त्रुटि देखता है, तो यह तुरंत एक UnavailableError त्रुटि या AbortedError त्रुटि उत्पन्न करेगा। आप इस मामले में समन्वयक को पुनरारंभ कर सकते हैं। समन्वयक स्वयं भी अनुपलब्ध हो सकता है। इसलिए, प्रशिक्षण प्रगति को न खोने के लिए कुछ टूलिंग की सिफारिश की जाती है:

  • Model.fit के लिए, आपको एक BackupAndRestore कॉलबैक का उपयोग करना चाहिए, जो प्रगति की बचत और बहाली को स्वचालित रूप से संभालता है। उदाहरण के लिए ऊपर कॉलबैक और प्रशिक्षण अनुभाग देखें।

  • एक कस्टम प्रशिक्षण लूप के लिए, आपको समय-समय पर मॉडल चर की जांच करनी चाहिए और प्रशिक्षण शुरू होने से पहले एक चेकपॉइंट से मॉडल चर लोड करना चाहिए, यदि कोई हो। यदि एक ऑप्टिमाइज़र को चेकपॉइंट किया जाता है, तो प्रशिक्षण प्रगति का लगभग optimizer.iterations से अनुमान लगाया जा सकता है:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

RemoteValue लाई जा रही है

यदि कोई फ़ंक्शन सफलतापूर्वक निष्पादित किया जाता है, तो RemoteValue प्राप्त करना सफल होने की गारंटी है। ऐसा इसलिए है क्योंकि वर्तमान में फ़ंक्शन निष्पादित होने के बाद वापसी मूल्य तुरंत समन्वयक को कॉपी किया जाता है। यदि प्रतिलिपि के दौरान कोई कार्यकर्ता विफल रहता है, तो फ़ंक्शन को किसी अन्य उपलब्ध कार्यकर्ता पर पुनः प्रयास किया जाएगा। इसलिए, यदि आप प्रदर्शन के लिए अनुकूलित करना चाहते हैं, तो आप बिना रिटर्न वैल्यू के फ़ंक्शन शेड्यूल कर सकते हैं।

त्रुटि की सूचना देना

एक बार समन्वयक एक त्रुटि देखता है जैसे कि पैरामीटर सर्वर से UnavailableError त्रुटि या अन्य एप्लिकेशन त्रुटियां जैसे कि InvalidArgument से एक tf.debugging.check_numerics , यह त्रुटि को बढ़ाने से पहले सभी लंबित और कतारबद्ध कार्यों को रद्द कर देगा। उनके संबंधित RemoteValue s प्राप्त करने से CancelledError बढ़ जाएगा।

एक त्रुटि उठाए जाने के बाद, समन्वयक उसी त्रुटि या रद्द किए गए कार्यों से कोई त्रुटि नहीं उठाएगा।

प्रदर्शन में सुधार

जब आप ParameterServerStrategy और ClusterResolver के साथ प्रशिक्षण लेते हैं, तो यदि आप प्रदर्शन के मुद्दों को देखते हैं, तो कई संभावित कारण हैं।

एक सामान्य कारण यह है कि पैरामीटर सर्वर में असंतुलित लोड होता है और कुछ भारी-भरकम पैरामीटर सर्वर क्षमता तक पहुँच जाते हैं। कई मूल कारण भी हो सकते हैं। इस समस्या को कम करने के कुछ सरल उपाय इस प्रकार हैं:

  1. ParameterServerStrategy का निर्माण करते समय एक चर_पार्टिशनर निर्दिष्ट करके अपने बड़े मॉडल variable_partitioner को साझा करें।
  2. यदि संभव हो तो एक ही चरण में सभी पैरामीटर सर्वरों के लिए आवश्यक हॉटस्पॉट वैरिएबल बनाने से बचें। उदाहरण के लिए, एक निरंतर सीखने की दर या उपवर्ग tf.keras.optimizers.schedules.LearningRateSchedule का उपयोग ऑप्टिमाइज़र में करें क्योंकि डिफ़ॉल्ट व्यवहार यह है कि सीखने की दर एक विशेष पैरामीटर सर्वर पर रखा गया एक चर बन जाएगा और प्रत्येक चरण में अन्य सभी पैरामीटर सर्वर द्वारा अनुरोध किया जाएगा। .
  3. अपने बड़े शब्दसंग्रहों को केरस प्रीप्रोसेसिंग परतों में भेजने से पहले उन्हें शफ़ल करें।

प्रदर्शन समस्याओं का एक अन्य संभावित कारण समन्वयक है। schedule / join का आपका पहला कार्यान्वयन पायथन-आधारित है और इस प्रकार थ्रेडिंग ओवरहेड हो सकता है। साथ ही समन्वयक और कार्यकर्ताओं के बीच विलंबता बड़ी हो सकती है। यदि यह बात है तो,

  • Model.fit के लिए, आप Model.compile पर Model.compile गए steps_per_execution तर्क को 1 से बड़े मान पर सेट कर सकते हैं।

  • एक कस्टम प्रशिक्षण लूप के लिए, आप कई चरणों को एक tf.function में पैक कर सकते हैं:

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

जैसा कि पुस्तकालय को और अधिक अनुकूलित किया गया है, उम्मीद है कि अधिकांश उपयोगकर्ताओं को भविष्य में मैन्युअल रूप से चरणों को पैक करने की आवश्यकता नहीं होगी।

इसके अलावा, प्रदर्शन में सुधार के लिए एक छोटी सी तरकीब है बिना रिटर्न वैल्यू के कार्यों को शेड्यूल करना जैसा कि ऊपर दिए गए हैंडलिंग टास्क फेल्योर सेक्शन में बताया गया है।

ज्ञात सीमाएं

अधिकांश ज्ञात सीमाएं पहले से ही उपरोक्त अनुभागों में शामिल हैं। यह खंड एक सारांश प्रदान करता है।

ParameterServerStrategy सामान्य

  • os.environment["grpc_fail_fast"]="use_caller" को कोऑर्डिनेटर सहित प्रत्येक कार्य पर गलती सहनशीलता को ठीक से काम करने के लिए आवश्यक है।
  • सिंक्रोनस पैरामीटर सर्वर प्रशिक्षण समर्थित नहीं है।
  • इष्टतम प्रदर्शन प्राप्त करने के लिए आमतौर पर एक ही फ़ंक्शन में कई चरणों को पैक करना आवश्यक होता है।
  • यह tf.saved_model.load के माध्यम से शार्प किए गए चर वाले सहेजे tf.saved_model.load को लोड करने के लिए समर्थित नहीं है। TensorFlow सर्विंग का उपयोग करके ऐसे सहेजे गए_मॉडल को लोड करने पर ध्यान दें, काम करने की उम्मीद है।
  • यह एक चेकपॉइंट लोड करने के लिए समर्थित नहीं है जिसमें शार्प किए गए ऑप्टिमाइज़र स्लॉट वेरिएबल्स को अलग-अलग संख्या में शार्क में लोड किया जा सकता है।
  • यह समन्वयक कार्य को पुनरारंभ किए बिना पैरामीटर सर्वर विफलता से पुनर्प्राप्त करने के लिए समर्थित नहीं है।
  • tf.lookup.StaticHashTable का उपयोग (जो आमतौर पर कुछ केरस प्रीप्रोसेसिंग परतों द्वारा नियोजित होता है, जैसे कि tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup , और tf.keras.layers.TextVectorization ) पर संसाधनों में परिणाम होता है इस समय समन्वयक पैरामीटर सर्वर प्रशिक्षण के साथ। यह आरपीसी को देखने के लिए कामगारों से लेकर समन्वयक तक के प्रदर्शन पर प्रभाव डालता है। यह संबोधित करने के लिए एक वर्तमान उच्च प्राथमिकता है।

Model.fit विशिष्टता

  • steps_per_epoch में Model.fit तर्क आवश्यक है। आप एक ऐसे मान का चयन कर सकते हैं जो किसी युग में उपयुक्त अंतराल प्रदान करता हो।
  • ParameterServerStrategy के पास कस्टम कॉलबैक के लिए समर्थन नहीं है जिसमें प्रदर्शन कारणों से बैच-स्तरीय कॉल हैं। आपको उन कॉलों को उपयुक्त रूप से चुने गए steps_per_epoch के साथ युग-स्तर की कॉलों में परिवर्तित करना चाहिए, ताकि उन्हें हर steps_per_epoch चरणों की संख्या कहा जा सके। बिल्ट-इन कॉलबैक प्रभावित नहीं होते हैं: उनके बैच-स्तरीय कॉलों को निष्पादक होने के लिए संशोधित किया गया है। ParameterServerStrategy के लिए सहायक बैच-स्तरीय कॉल की योजना बनाई जा रही है।
  • इसी कारण से, अन्य रणनीतियों के विपरीत, प्रगति बार और मेट्रिक्स केवल युग की सीमाओं पर लॉग किए जाते हैं।
  • run_eagerly समर्थित नहीं है।

कस्टम प्रशिक्षण लूप विवरण

  • ClusterCoordinator.schedule किसी डेटासेट के लिए विज़िट की गारंटी का समर्थन नहीं करता है।
  • जब ClusterCoordinator.create_per_worker_dataset का उपयोग किया जाता है, तो पूरे डेटासेट को इसे पास किए गए फ़ंक्शन के अंदर बनाया जाना चाहिए।
  • tf.data.Options द्वारा बनाए गए डेटासेट में ClusterCoordinator.create_per_worker_dataset को अनदेखा किया जाता है।