TensorFlow.org पर देखें | Google Colab में चलाएं | GitHub पर स्रोत देखें | नोटबुक डाउनलोड करें |
अवलोकन
कई मशीनों पर मॉडल प्रशिक्षण को बढ़ाने के लिए पैरामीटर सर्वर प्रशिक्षण एक सामान्य डेटा-समानांतर विधि है।
एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर में श्रमिक और पैरामीटर सर्वर होते हैं। चर पैरामीटर सर्वर पर बनाए जाते हैं और उन्हें प्रत्येक चरण में श्रमिकों द्वारा पढ़ा और अद्यतन किया जाता है। डिफ़ॉल्ट रूप से, कार्यकर्ता इन चरों को एक दूसरे के साथ सिंक्रनाइज़ किए बिना स्वतंत्र रूप से पढ़ते और अद्यतन करते हैं। यही कारण है कि कभी-कभी पैरामीटर सर्वर-शैली प्रशिक्षण को अतुल्यकालिक प्रशिक्षण कहा जाता है।
TensorFlow 2 में, पैरामीटर सर्वर प्रशिक्षण tf.distribute.experimental.ParameterServerStrategy
वर्ग द्वारा संचालित होता है, जो प्रशिक्षण चरणों को एक क्लस्टर में वितरित करता है जो हजारों श्रमिकों (पैरामीटर सर्वर के साथ) तक स्केल करता है।
समर्थित प्रशिक्षण विधियां
दो मुख्य समर्थित प्रशिक्षण विधियां हैं:
-
Model.fit
API, जिसकी अनुशंसा तब की जाती है जब आप उच्च-स्तरीय अमूर्तता और प्रशिक्षण के संचालन को प्राथमिकता देते हैं। - एक कस्टम प्रशिक्षण लूप (आप कस्टम प्रशिक्षण का उल्लेख कर सकते हैं, खरोंच से एक प्रशिक्षण लूप लिखना और अधिक विवरण के लिए केरस और मल्टीवर्कर मिररडस्ट्रेटी के साथ कस्टम प्रशिक्षण लूप ।) कस्टम लूप प्रशिक्षण की सिफारिश तब की जाती है जब आप उनके प्रशिक्षण लूप के विवरण को परिभाषित करना पसंद करते हैं।
नौकरियों और कार्यों के साथ एक समूह
पसंद के एपीआई ( Model.fit
या एक कस्टम ट्रेनिंग लूप) के बावजूद, TensorFlow 2 में वितरित प्रशिक्षण में शामिल हैं: कई 'jobs'
के साथ एक 'cluster'
, और प्रत्येक जॉब में एक या अधिक 'tasks'
हो सकते हैं।
पैरामीटर सर्वर प्रशिक्षण का उपयोग करते समय, इसकी अनुशंसा की जाती है:
- एक समन्वयक की नौकरी (जिसका नाम
chief
है) - एकाधिक कार्यकर्ता नौकरियां (नौकरी का नाम
worker
); तथा - एकाधिक पैरामीटर सर्वर कार्य (नौकरी का नाम
ps
)
जबकि समन्वयक संसाधन बनाता है, प्रशिक्षण कार्यों को भेजता है, चौकियों को लिखता है, और कार्य विफलताओं से निपटता है, कार्यकर्ता और पैरामीटर सर्वर tf.distribute.Server
चलाते हैं जो समन्वयक के अनुरोधों को सुनते हैं।
Model.fit
API के साथ पैरामीटर सर्वर प्रशिक्षण
Model.fit
API के साथ पैरामीटर सर्वर प्रशिक्षण के लिए समन्वयक को इनपुट के रूप में tf.distribute.experimental.ParameterServerStrategy
ऑब्जेक्ट और एक tf.keras.utils.experimental.DatasetCreator
का उपयोग करने की आवश्यकता होती है। बिना किसी रणनीति के Model.fit
उपयोग के समान, या अन्य रणनीतियों के साथ, वर्कफ़्लो में मॉडल बनाना और संकलित करना, कॉलबैक तैयार करना, उसके बाद Model.fit
कॉल करना शामिल है।
कस्टम प्रशिक्षण लूप के साथ पैरामीटर सर्वर प्रशिक्षण
कस्टम प्रशिक्षण लूप के साथ, tf.distribute.experimental.coordinator.ClusterCoordinator
वर्ग समन्वयक के लिए उपयोग किया जाने वाला प्रमुख घटक है।
-
ClusterCoordinator
वर्ग कोtf.distribute.Strategy
ऑब्जेक्ट के साथ मिलकर काम करने की आवश्यकता है। - क्लस्टर की जानकारी प्रदान करने के लिए इस
tf.distribute.Strategy
ऑब्जेक्ट की आवश्यकता होती है और इसका उपयोग प्रशिक्षण चरण को परिभाषित करने के लिए किया जाता है, जैसा कि tf.distribute.Strategy के साथ कस्टम प्रशिक्षण में दिखाया गया है। -
ClusterCoordinator
ऑब्जेक्ट फिर इन प्रशिक्षण चरणों के निष्पादन को दूरस्थ श्रमिकों को भेजता है। - पैरामीटर सर्वर प्रशिक्षण के लिए, क्लस्टर
tf.distribute.experimental.ParameterServerStrategy
कोClusterCoordinator
के साथ काम करने की आवश्यकता है।
ClusterCoordinator
ऑब्जेक्ट द्वारा प्रदान किया गया सबसे महत्वपूर्ण API schedule
है:
-
schedule
एपीआई एकtf.function
को एनक्यू करता है और भविष्य जैसाRemoteValue
तुरंत लौटाता है। - कतारबद्ध कार्यों को पृष्ठभूमि थ्रेड में दूरस्थ श्रमिकों को भेजा जाएगा और उनके
RemoteValue
s को एसिंक्रोनस रूप से भर दिया जाएगा। - चूंकि
schedule
में वर्कर असाइनमेंट की आवश्यकता नहीं होती है, इसलिए पास किए गएtf.function
को किसी भी उपलब्ध वर्कर पर निष्पादित किया जा सकता है। - यदि जिस कार्यकर्ता पर इसे निष्पादित किया गया है, वह पूरा होने से पहले अनुपलब्ध हो जाता है, तो फ़ंक्शन को किसी अन्य उपलब्ध कार्यकर्ता पर पुनः प्रयास किया जाएगा।
- इस तथ्य और इस तथ्य के कारण कि फ़ंक्शन निष्पादन परमाणु नहीं है, एक फ़ंक्शन को एक से अधिक बार निष्पादित किया जा सकता है।
दूरस्थ कार्यों को भेजने के अलावा, ClusterCoordinator
सभी श्रमिकों पर डेटासेट बनाने में भी मदद करता है और जब कोई कार्यकर्ता विफलता से ठीक हो जाता है तो इन डेटासेट का पुनर्निर्माण करता है।
ट्यूटोरियल सेटअप
ट्यूटोरियल Model.fit
और कस्टम ट्रेनिंग लूप पथों में विभाजित होगा, और आप अपनी आवश्यकताओं के अनुरूप एक चुन सकते हैं। "एक्स के साथ प्रशिक्षण" के अलावा अन्य अनुभाग दोनों पथों पर लागू होते हैं।
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
क्लस्टर सेटअप
जैसा कि ऊपर उल्लेख किया गया है, एक पैरामीटर सर्वर प्रशिक्षण क्लस्टर को एक समन्वयक कार्य की आवश्यकता होती है जो आपके प्रशिक्षण कार्यक्रम को चलाता है, एक या कई कार्यकर्ता और पैरामीटर सर्वर कार्य जो TensorFlow सर्वर चलाते हैं- tf.distribute.Server
और संभवतः एक अतिरिक्त मूल्यांकन कार्य जो साइड-कार मूल्यांकन चलाता है (नीचे साइड-कार मूल्यांकन अनुभाग देखें)। उन्हें स्थापित करने की आवश्यकताएं हैं:
- समन्वयक कार्य को मूल्यांकनकर्ता को छोड़कर अन्य सभी TensorFlow सर्वरों के पते और बंदरगाहों को जानना होगा।
- श्रमिकों और पैरामीटर सर्वरों को यह जानने की जरूरत है कि उन्हें किस पोर्ट को सुनना है। सादगी के लिए, आप आमतौर पर इन कार्यों पर TensorFlow सर्वर बनाते समय पूरी क्लस्टर जानकारी पास कर सकते हैं।
- मूल्यांकनकर्ता कार्य को प्रशिक्षण क्लस्टर के सेटअप को जानने की आवश्यकता नहीं है। यदि ऐसा होता है, तो उसे प्रशिक्षण क्लस्टर से जुड़ने का प्रयास नहीं करना चाहिए।
- श्रमिकों और पैरामीटर सर्वरों में क्रमशः
"worker"
और"ps"
के रूप में कार्य प्रकार होने चाहिए। समन्वयक को विरासत कारणों से कार्य प्रकार के रूप में"chief"
का उपयोग करना चाहिए।
इस ट्यूटोरियल में, आप एक इन-प्रोसेस क्लस्टर बनाएंगे ताकि संपूर्ण पैरामीटर सर्वर प्रशिक्षण कोलाब में चलाया जा सके। आप बाद के अनुभाग में वास्तविक समूहों को स्थापित करने का तरीका जानेंगे।
इन-प्रोसेस क्लस्टर
आप पहले से कई TensorFlow सर्वर बनाकर शुरू करेंगे और बाद में उनसे जुड़ेंगे। ध्यान दें कि यह केवल इस ट्यूटोरियल के प्रदर्शन के उद्देश्य के लिए है, और वास्तविक प्रशिक्षण में सर्वर "worker"
और "ps"
मशीनों पर शुरू किए जाएंगे।
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
इन-प्रोसेस क्लस्टर सेटअप अक्सर यूनिट परीक्षण में उपयोग किया जाता है, जैसे कि यहां ।
स्थानीय परीक्षण के लिए एक अन्य विकल्प स्थानीय मशीन पर प्रक्रियाओं को लॉन्च करना है - इस दृष्टिकोण के उदाहरण के लिए केरस के साथ बहु-कार्यकर्ता प्रशिक्षण देखें।
एक ParameterServerStrategy को तुरंत चालू करें
इससे पहले कि आप प्रशिक्षण कोड में गोता लगाएँ, आइए एक ParameterServerStrategy
ऑब्जेक्ट को इंस्टेंट करें। ध्यान दें कि यह आवश्यक है, भले ही आप Model.fit
या कस्टम प्रशिक्षण लूप के साथ आगे बढ़ रहे हों। वेरिएबल_पार्टिशनर तर्क को variable_partitioner
शार्डिंग सेक्शन में समझाया जाएगा।
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
प्रशिक्षण के लिए GPU का उपयोग करने के लिए, प्रत्येक कार्यकर्ता को दृश्यमान GPU आवंटित करें। ParameterServerStrategy
प्रत्येक कार्यकर्ता पर सभी उपलब्ध GPU का उपयोग करेगा, इस प्रतिबंध के साथ कि सभी श्रमिकों के पास समान संख्या में GPU उपलब्ध होने चाहिए।
वेरिएबल शार्डिंग
वेरिएबल शार्पनिंग एक वेरिएबल को कई छोटे वेरिएबल में विभाजित करने के लिए संदर्भित करता है, जिसे शार्ड्स कहा जाता है। इन शार्क तक पहुँचने पर नेटवर्क लोड को वितरित करने के लिए वेरिएबल शार्डिंग उपयोगी हो सकती है। यह कई पैरामीटर सर्वरों में एक सामान्य चर की गणना और भंडारण को वितरित करने के लिए भी उपयोगी है।
वैरिएबल शार्डिंग को सक्षम करने के लिए, आप एक ParameterServerStrategy
ऑब्जेक्ट का निर्माण करते समय एक variable_partitioner
में पास कर सकते हैं। वेरिएबल_पार्टिशनर को हर बार लागू किया जाएगा जब एक variable_partitioner
बनाया जाएगा और यह वेरिएबल के प्रत्येक आयाम के साथ शार्क की संख्या को वापस करने की उम्मीद है। कुछ आउट-ऑफ़-बॉक्स variable_partitioner
s प्रदान किए जाते हैं जैसे tf.distribute.experimental.partitioners.MinSizePartitioner
। छोटे चरों को विभाजित करने से बचने के लिए आकार-आधारित विभाजनकर्ताओं जैसे tf.distribute.experimental.partitioners.MinSizePartitioner
का उपयोग करने की अनुशंसा की जाती है, जो मॉडल प्रशिक्षण गति पर नकारात्मक प्रभाव डाल सकता है।
जब एक variable_partitioner
पास किया जाता है और यदि आप सीधे strategy.scope()
के तहत एक वेरिएबल बनाते हैं, तो यह एक variables
प्रॉपर्टी के साथ एक कंटेनर टाइप बन जाएगा जो शार्क की सूची तक पहुंच प्रदान करता है। ज्यादातर मामलों में, यह कंटेनर सभी शार्क को जोड़कर स्वचालित रूप से एक टेंसर में परिवर्तित हो जाएगा। नतीजतन, इसे एक सामान्य चर के रूप में इस्तेमाल किया जा सकता है। दूसरी ओर, कुछ TensorFlow विधियाँ जैसे tf.nn.embedding_lookup
इस कंटेनर प्रकार के लिए कुशल कार्यान्वयन प्रदान करती हैं और इन विधियों में स्वचालित संयोजन से बचा जाएगा।
अधिक विवरण के लिए कृपया tf.distribute.experimental.ParameterServerStrategy
के API दस्तावेज़ देखें।
Model.fit
के साथ प्रशिक्षण
Model.fit
के माध्यम से एक उपयोग में आसान प्रशिक्षण एपीआई प्रदान करता है जो हुड के नीचे प्रशिक्षण लूप को संभालता है, train_step
के लचीलेपन के साथ, और कॉलबैक, जो TensorBoard के लिए चेकपॉइंट सेविंग या सारांश बचत जैसी कार्यक्षमता प्रदान करता है। Model.fit
के साथ, रणनीति ऑब्जेक्ट के सरल स्वैप के साथ अन्य रणनीतियों के लिए समान प्रशिक्षण कोड का उपयोग किया जा सकता है।
इनपुट डेटा
Model.fit
पैरामीटर सर्वर प्रशिक्षण के साथ आवश्यक है कि इनपुट डेटा एक कॉल करने योग्य में प्रदान किया जाए जो tf.distribute.InputContext
प्रकार का एक तर्क लेता है, और एक tf.data.Dataset
देता है। फिर, एक tf.keras.utils.experimental.DatasetCreator
ऑब्जेक्ट बनाएं जो इस तरह के कॉल करने callable
, और एक वैकल्पिक tf.distribute.InputOptions
ऑब्जेक्ट को input_options
तर्क के माध्यम से लेता है।
ध्यान दें कि पैरामीटर सर्वर प्रशिक्षण के साथ डेटा को फेरबदल और दोहराने की सिफारिश की जाती है, और fit
कॉल में steps_per_epoch
निर्दिष्ट करें ताकि पुस्तकालय युग की सीमाओं को जान सके।
इनपुट InputContext
तर्क के बारे में अधिक जानकारी के लिए कृपया वितरित इनपुट ट्यूटोरियल देखें।
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
dataset_fn
में कोड इनपुट डिवाइस पर लागू किया जाएगा, जो आमतौर पर सीपीयू है, प्रत्येक कार्यकर्ता मशीन पर।
मॉडल निर्माण और संकलन
अब, आप एक tf.keras.Model
एक तुच्छ tf.keras.models.Sequential
.प्रदर्शन उद्देश्यों के लिए अनुक्रमिक मॉडल-उसके बाद एक Model.compile
कॉल को घटकों को शामिल करने के लिए बनाएंगे, जैसे कि एक ऑप्टिमाइज़र, मेट्रिक्स, या पैरामीटर जैसे steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
कॉलबैक और प्रशिक्षण
वास्तविक प्रशिक्षण के लिए model.fit
को कॉल करने से पहले, आइए सामान्य कार्यों के लिए आवश्यक कॉलबैक तैयार करें, जैसे:
-
ModelCheckpoint
: मॉडल के वजन को बचाने के लिए। -
BackupAndRestore
: यह सुनिश्चित करने के लिए कि प्रशिक्षण की प्रगति का स्वचालित रूप से बैकअप लिया जाता है, और यदि क्लस्टर अनुपलब्धता (जैसे गर्भपात या छूट) का अनुभव करता है, तो उसे पुनर्प्राप्त किया जाता है; या -
TensorBoard
: प्रगति रिपोर्ट को सारांश फ़ाइलों में सहेजने के लिए, जो TensorBoard टूल में विज़ुअलाइज़ हो जाती हैं।
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
ClusterCoordinator
के साथ प्रत्यक्ष उपयोग (वैकल्पिक)
यहां तक कि अगर आप Model.fit
प्रशिक्षण पथ चुनते हैं, तो आप वैकल्पिक रूप से एक tf.distribute.experimental.coordinator.ClusterCoordinator
ऑब्जेक्ट को अन्य कार्यों को शेड्यूल करने के लिए तत्काल कर सकते हैं जिन्हें आप श्रमिकों पर निष्पादित करना चाहते हैं। अधिक विवरण और उदाहरणों के लिए एक कस्टम प्रशिक्षण लूप अनुभाग के साथ प्रशिक्षण देखें।
एक कस्टम प्रशिक्षण लूप के साथ प्रशिक्षण
tf.distribute.Strategy
के साथ कस्टम प्रशिक्षण लूप का उपयोग करना प्रशिक्षण लूप को परिभाषित करने के लिए बहुत लचीलापन प्रदान करता है। ऊपर परिभाषित ParameterServerStrategy
( strategy
के रूप में) के साथ, आप दूरस्थ श्रमिकों को प्रशिक्षण चरणों के निष्पादन को भेजने के लिए एक tf.distribute.experimental.coordinator.ClusterCoordinator
का उपयोग करेंगे।
फिर, आप एक मॉडल बनाएंगे, एक डेटासेट और एक चरण फ़ंक्शन को परिभाषित करेंगे, जैसा कि आपने अन्य tf.distribute.Strategy
s के साथ प्रशिक्षण लूप में किया है। आप tf.distribute.Strategy ट्यूटोरियल के साथ कस्टम प्रशिक्षण में अधिक विवरण प्राप्त कर सकते हैं।
कुशल डेटासेट प्रीफ़ेचिंग सुनिश्चित करने के लिए, डिस्पैच ट्रेनिंग स्टेप्स टू रिमोट वर्कर्स सेक्शन में उल्लिखित अनुशंसित वितरित डेटासेट निर्माण एपीआई का उपयोग करें। साथ ही, कामगारों को आवंटित GPU का पूरा लाभ उठाने के लिए worker_fn
के अंदर Strategy.run
को कॉल करना सुनिश्चित करें। शेष चरण GPU के साथ या उसके बिना प्रशिक्षण के लिए समान हैं।
आइए इन घटकों को निम्नलिखित चरणों में बनाएं:
डेटा सेट करें
सबसे पहले, एक ऐसा फ़ंक्शन लिखें जो एक डेटासेट बनाता है जिसमें केरस प्रीप्रोसेसिंग लेयर्स द्वारा कार्यान्वित प्रीप्रोसेसिंग लॉजिक शामिल होता है।
आप इन परतों को dataset_fn
dataset_fn
अंदर परिवर्तन लागू करेंगे, क्योंकि आप dataset_fn
को एक tf.function में tf.function
, जो इसके अंदर चर बनाने की अनुमति नहीं देता है।
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
डेटासेट में खिलौना उदाहरण जेनरेट करें:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
फिर, dataset_fn
में लिपटे प्रशिक्षण डेटासेट बनाएं:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
मॉडल बनाएं
इसके बाद, मॉडल और अन्य ऑब्जेक्ट बनाएं। strategy.scope
के तहत सभी वेरिएबल बनाना सुनिश्चित करें।
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
आइए पुष्टि करें कि FixedShardsPartitioner
के उपयोग ने सभी चरों को दो शार्प में विभाजित किया है और प्रत्येक शार्प को विभिन्न पैरामीटर सर्वरों को सौंपा गया है:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
प्रशिक्षण चरण को परिभाषित करें
तीसरा, tf.function
में लिपटे प्रशिक्षण चरण बनाएं:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
उपरोक्त प्रशिक्षण चरण समारोह में, step_fn
में Strategy.run
और Strategy.reduce
को कॉल करना प्रति कार्यकर्ता कई GPU का समर्थन कर सकता है। यदि श्रमिकों के पास GPU आवंटित किया गया है, तो Strategy.run
डेटासेट को कई प्रतिकृतियों पर वितरित करेगा।
दूरस्थ श्रमिकों को प्रशिक्षण चरण भेजें
ParameterServerStrategy
द्वारा सभी गणनाओं को परिभाषित करने के बाद, आप tf.distribute.experimental.coordinator.ClusterCoordinator
वर्ग का उपयोग संसाधन बनाने और दूरस्थ श्रमिकों को प्रशिक्षण चरणों को वितरित करने के लिए करेंगे।
आइए पहले ClusterCoordinator
ऑब्जेक्ट बनाएं और स्ट्रैटेजी ऑब्जेक्ट में पास करें:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
फिर, प्रति-कार्यकर्ता डेटासेट और एक पुनरावर्तक बनाएं। नीचे दिए गए per_worker_dataset_fn
में, dataset_fn
को strategy.distribute_datasets_from_function
में लपेटने की अनुशंसा की जाती है ताकि GPU को बिना किसी बाधा के प्रभावी प्रीफ़ेच किया जा सके।
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
अंतिम चरण ClusterCoordinator.schedule
का उपयोग करके दूरस्थ श्रमिकों को गणना वितरित करना है:
-
schedule
मेथड एक tf.फंक्शन को एनक्यू करता है और फ्यूचर जैसाtf.function
तुरंतRemoteValue
है। कतारबद्ध कार्यों को पृष्ठभूमि थ्रेड्स में दूरस्थ श्रमिकों को भेजा जाएगा औरRemoteValue
को अतुल्यकालिक रूप से भरा जाएगा। -
join
की विधि (ClusterCoordinator.join
) का उपयोग तब तक प्रतीक्षा करने के लिए किया जा सकता है जब तक कि सभी निर्धारित कार्य निष्पादित नहीं हो जाते।
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
यहां बताया गया है कि आप RemoteValue
का परिणाम कैसे प्राप्त कर सकते हैं:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
वैकल्पिक रूप से, आप सभी चरणों को लॉन्च कर सकते हैं और पूरा होने की प्रतीक्षा करते हुए कुछ कर सकते हैं:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
इस विशेष उदाहरण के लिए संपूर्ण प्रशिक्षण और सर्विंग वर्कफ़्लो के लिए, कृपया यह परीक्षण देखें।
डेटासेट निर्माण के बारे में अधिक
उपरोक्त कोड में डेटासेट ClusterCoordinator.create_per_worker_dataset
API का उपयोग करके बनाया गया है)। यह प्रति कार्यकर्ता एक डेटासेट बनाता है और एक कंटेनर ऑब्जेक्ट देता है। प्रति-कार्यकर्ता पुनरावर्तक बनाने के लिए आप उस पर iter
विधि को कॉल कर सकते हैं। प्रति-कार्यकर्ता पुनरावर्तक में प्रति कार्यकर्ता एक पुनरावर्तक होता है और किसी विशेष कार्यकर्ता पर फ़ंक्शन निष्पादित होने से पहले एक कार्यकर्ता के संबंधित स्लाइस को ClusterCoordinator.schedule
विधि में पारित फ़ंक्शन के इनपुट तर्क में प्रतिस्थापित किया जाएगा।
वर्तमान में, ClusterCoordinator.schedule
विधि मानती है कि कार्यकर्ता समकक्ष हैं और इस प्रकार यह मानता है कि विभिन्न श्रमिकों के डेटासेट समान हैं, सिवाय इसके कि यदि उनमें Dataset.shuffle
ऑपरेशन होता है तो उन्हें अलग तरीके से फेरबदल किया जा सकता है। इस वजह से, यह भी अनुशंसा की जाती है कि डेटासेट को अनिश्चित काल तक दोहराया जाए और आप डेटासेट से OutOfRangeError
पर भरोसा करने के बजाय चरणों की एक सीमित संख्या शेड्यूल करें।
एक अन्य महत्वपूर्ण नोट यह है कि tf.data
डेटासेट कार्य सीमाओं में निहित क्रमांकन और अक्रमांकन का समर्थन नहीं करता है। इसलिए ClusterCoordinator.create_per_worker_dataset
को दिए गए फ़ंक्शन के अंदर संपूर्ण डेटासेट बनाना महत्वपूर्ण है।
मूल्यांकन
वितरित प्रशिक्षण में मूल्यांकन लूप को परिभाषित करने और चलाने के एक से अधिक तरीके हैं। नीचे वर्णित अनुसार प्रत्येक के अपने फायदे और नुकसान हैं। यदि आपकी कोई प्राथमिकता नहीं है, तो इनलाइन मूल्यांकन पद्धति की अनुशंसा की जाती है।
इनलाइन मूल्यांकन
इस पद्धति में, समन्वयक प्रशिक्षण और मूल्यांकन के बीच वैकल्पिक करता है और इस प्रकार इसे इनलाइन मूल्यांकन कहा जाता है।
इनलाइन मूल्यांकन के कई लाभ हैं। उदाहरण के लिए:
- यह बड़े मूल्यांकन मॉडल और मूल्यांकन डेटासेट का समर्थन कर सकता है जो एक एकल कार्य नहीं कर सकता है।
- अगले युग के प्रशिक्षण के लिए निर्णय लेने के लिए मूल्यांकन परिणामों का उपयोग किया जा सकता है।
इनलाइन मूल्यांकन को लागू करने के दो तरीके हैं: प्रत्यक्ष मूल्यांकन और वितरित मूल्यांकन।
- प्रत्यक्ष मूल्यांकन : छोटे मॉडल और मूल्यांकन डेटासेट के लिए, समन्वयक समन्वयक पर मूल्यांकन डेटासेट के साथ सीधे वितरित मॉडल पर मूल्यांकन चला सकता है:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000प्लेसहोल्डर26
- वितरित मूल्यांकन : बड़े मॉडल या डेटासेट के लिए जो सीधे समन्वयक पर चलाना संभव नहीं है, समन्वयक कार्य
ClusterCoordinator.schedule
/ClusterCoordinator.join
विधियों के माध्यम से श्रमिकों को मूल्यांकन कार्यों को वितरित कर सकता है:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
साइड-कार मूल्यांकन
एक अन्य विधि को साइड-कार मूल्यांकन कहा जाता है जहां आप एक समर्पित मूल्यांकनकर्ता कार्य बनाते हैं जो बार-बार चौकियों को पढ़ता है और नवीनतम चेकपॉइंट पर मूल्यांकन चलाता है। यह आपके प्रशिक्षण कार्यक्रम को जल्दी समाप्त करने की अनुमति देता है यदि आपको मूल्यांकन परिणामों के आधार पर अपने प्रशिक्षण लूप को बदलने की आवश्यकता नहीं है। हालांकि, मूल्यांकन को ट्रिगर करने के लिए अतिरिक्त मूल्यांकनकर्ता कार्य और आवधिक जांच की आवश्यकता होती है। एक संभावित साइड-कार मूल्यांकन लूप निम्नलिखित है:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
वास्तविक दुनिया में क्लस्टर
एक वास्तविक उत्पादन वातावरण में, आप विभिन्न मशीनों पर विभिन्न प्रक्रियाओं में सभी कार्यों को चलाएंगे। प्रत्येक कार्य पर क्लस्टर जानकारी को कॉन्फ़िगर करने का सबसे सरल तरीका है "TF_CONFIG"
पर्यावरण चर सेट करना और "TF_CONFIG"
को पार्स करने के लिए tf.distribute.cluster_resolver.TFConfigClusterResolver
का उपयोग करना।
"TF_CONFIG"
पर्यावरण चर के बारे में सामान्य विवरण के लिए, वितरित प्रशिक्षण मार्गदर्शिका देखें।
यदि आप कुबेरनेट्स या अन्य कॉन्फ़िगरेशन टेम्प्लेट का उपयोग करके अपना प्रशिक्षण कार्य शुरू करते हैं, तो बहुत संभावना है कि ये टेम्प्लेट आपके लिए पहले से ही “TF_CONFIG"
सेट कर चुके हों।
"TF_CONFIG"
पर्यावरण चर सेट करें
मान लें कि आपके पास 3 कर्मचारी और 2 पैरामीटर सर्वर हैं, तो कार्यकर्ता 1 का "TF_CONFIG"
हो सकता है:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
मूल्यांकनकर्ता का "TF_CONFIG"
हो सकता है:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
मूल्यांकनकर्ता के लिए उपरोक्त "TF_CONFIG"
स्ट्रिंग में "cluster"
भाग वैकल्पिक है।
यदि आप सभी कार्यों के लिए एक ही बाइनरी का उपयोग करते हैं
यदि आप इन सभी कार्यों को एक बाइनरी का उपयोग करके चलाना पसंद करते हैं, तो आपको शुरुआत में ही अपने प्रोग्राम शाखा को विभिन्न भूमिकाओं में जाने देना होगा:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
निम्न कोड एक TensorFlow सर्वर शुरू करता है और प्रतीक्षा करता है:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
कार्य विफलता को संभालना
कार्यकर्ता विफलता
tf.distribute.experimental.coordinator.ClusterCoordinator
या Model.fit
कार्यकर्ता विफलता के लिए अंतर्निहित दोष सहिष्णुता प्रदान करते हैं। कार्यकर्ता पुनर्प्राप्ति पर, पहले प्रदान किया गया डेटासेट फ़ंक्शन (या तो ClusterCoordinator.create_per_worker_dataset
एक कस्टम प्रशिक्षण लूप के लिए, या tf.keras.utils.experimental.DatasetCreator
for Model.fit
) डेटासेट को फिर से बनाने के लिए श्रमिकों पर लागू किया जाएगा।
पैरामीटर सर्वर या समन्वयक विफलता
हालांकि, जब समन्वयक पैरामीटर सर्वर त्रुटि देखता है, तो यह तुरंत एक UnavailableError
त्रुटि या AbortedError
त्रुटि उत्पन्न करेगा। आप इस मामले में समन्वयक को पुनरारंभ कर सकते हैं। समन्वयक स्वयं भी अनुपलब्ध हो सकता है। इसलिए, प्रशिक्षण प्रगति को न खोने के लिए कुछ टूलिंग की सिफारिश की जाती है:
Model.fit
के लिए, आपको एकBackupAndRestore
कॉलबैक का उपयोग करना चाहिए, जो प्रगति की बचत और बहाली को स्वचालित रूप से संभालता है। उदाहरण के लिए ऊपर कॉलबैक और प्रशिक्षण अनुभाग देखें।एक कस्टम प्रशिक्षण लूप के लिए, आपको समय-समय पर मॉडल चर की जांच करनी चाहिए और प्रशिक्षण शुरू होने से पहले एक चेकपॉइंट से मॉडल चर लोड करना चाहिए, यदि कोई हो। यदि एक ऑप्टिमाइज़र को चेकपॉइंट किया जाता है, तो प्रशिक्षण प्रगति का लगभग
optimizer.iterations
से अनुमान लगाया जा सकता है:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
RemoteValue
लाई जा रही है
यदि कोई फ़ंक्शन सफलतापूर्वक निष्पादित किया जाता है, तो RemoteValue
प्राप्त करना सफल होने की गारंटी है। ऐसा इसलिए है क्योंकि वर्तमान में फ़ंक्शन निष्पादित होने के बाद वापसी मूल्य तुरंत समन्वयक को कॉपी किया जाता है। यदि प्रतिलिपि के दौरान कोई कार्यकर्ता विफल रहता है, तो फ़ंक्शन को किसी अन्य उपलब्ध कार्यकर्ता पर पुनः प्रयास किया जाएगा। इसलिए, यदि आप प्रदर्शन के लिए अनुकूलित करना चाहते हैं, तो आप बिना रिटर्न वैल्यू के फ़ंक्शन शेड्यूल कर सकते हैं।
त्रुटि की सूचना देना
एक बार समन्वयक एक त्रुटि देखता है जैसे कि पैरामीटर सर्वर से UnavailableError
त्रुटि या अन्य एप्लिकेशन त्रुटियां जैसे कि InvalidArgument
से एक tf.debugging.check_numerics
, यह त्रुटि को बढ़ाने से पहले सभी लंबित और कतारबद्ध कार्यों को रद्द कर देगा। उनके संबंधित RemoteValue
s प्राप्त करने से CancelledError
बढ़ जाएगा।
एक त्रुटि उठाए जाने के बाद, समन्वयक उसी त्रुटि या रद्द किए गए कार्यों से कोई त्रुटि नहीं उठाएगा।
प्रदर्शन में सुधार
जब आप ParameterServerStrategy
और ClusterResolver
के साथ प्रशिक्षण लेते हैं, तो यदि आप प्रदर्शन के मुद्दों को देखते हैं, तो कई संभावित कारण हैं।
एक सामान्य कारण यह है कि पैरामीटर सर्वर में असंतुलित लोड होता है और कुछ भारी-भरकम पैरामीटर सर्वर क्षमता तक पहुँच जाते हैं। कई मूल कारण भी हो सकते हैं। इस समस्या को कम करने के कुछ सरल उपाय इस प्रकार हैं:
-
ParameterServerStrategy
का निर्माण करते समय एक चर_पार्टिशनर निर्दिष्ट करके अपने बड़े मॉडलvariable_partitioner
को साझा करें। - यदि संभव हो तो एक ही चरण में सभी पैरामीटर सर्वरों के लिए आवश्यक हॉटस्पॉट वैरिएबल बनाने से बचें। उदाहरण के लिए, एक निरंतर सीखने की दर या उपवर्ग
tf.keras.optimizers.schedules.LearningRateSchedule
का उपयोग ऑप्टिमाइज़र में करें क्योंकि डिफ़ॉल्ट व्यवहार यह है कि सीखने की दर एक विशेष पैरामीटर सर्वर पर रखा गया एक चर बन जाएगा और प्रत्येक चरण में अन्य सभी पैरामीटर सर्वर द्वारा अनुरोध किया जाएगा। . - अपने बड़े शब्दसंग्रहों को केरस प्रीप्रोसेसिंग परतों में भेजने से पहले उन्हें शफ़ल करें।
प्रदर्शन समस्याओं का एक अन्य संभावित कारण समन्वयक है। schedule
/ join
का आपका पहला कार्यान्वयन पायथन-आधारित है और इस प्रकार थ्रेडिंग ओवरहेड हो सकता है। साथ ही समन्वयक और कार्यकर्ताओं के बीच विलंबता बड़ी हो सकती है। यदि यह बात है तो,
Model.fit
के लिए, आप Model.compile परModel.compile
गएsteps_per_execution
तर्क को 1 से बड़े मान पर सेट कर सकते हैं।एक कस्टम प्रशिक्षण लूप के लिए, आप कई चरणों को एक
tf.function
में पैक कर सकते हैं:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
जैसा कि पुस्तकालय को और अधिक अनुकूलित किया गया है, उम्मीद है कि अधिकांश उपयोगकर्ताओं को भविष्य में मैन्युअल रूप से चरणों को पैक करने की आवश्यकता नहीं होगी।
इसके अलावा, प्रदर्शन में सुधार के लिए एक छोटी सी तरकीब है बिना रिटर्न वैल्यू के कार्यों को शेड्यूल करना जैसा कि ऊपर दिए गए हैंडलिंग टास्क फेल्योर सेक्शन में बताया गया है।
ज्ञात सीमाएं
अधिकांश ज्ञात सीमाएं पहले से ही उपरोक्त अनुभागों में शामिल हैं। यह खंड एक सारांश प्रदान करता है।
ParameterServerStrategy
सामान्य
-
os.environment["grpc_fail_fast"]="use_caller"
को कोऑर्डिनेटर सहित प्रत्येक कार्य पर गलती सहनशीलता को ठीक से काम करने के लिए आवश्यक है। - सिंक्रोनस पैरामीटर सर्वर प्रशिक्षण समर्थित नहीं है।
- इष्टतम प्रदर्शन प्राप्त करने के लिए आमतौर पर एक ही फ़ंक्शन में कई चरणों को पैक करना आवश्यक होता है।
- यह tf.saved_model.load के माध्यम से शार्प किए गए चर वाले सहेजे
tf.saved_model.load
को लोड करने के लिए समर्थित नहीं है। TensorFlow सर्विंग का उपयोग करके ऐसे सहेजे गए_मॉडल को लोड करने पर ध्यान दें, काम करने की उम्मीद है। - यह एक चेकपॉइंट लोड करने के लिए समर्थित नहीं है जिसमें शार्प किए गए ऑप्टिमाइज़र स्लॉट वेरिएबल्स को अलग-अलग संख्या में शार्क में लोड किया जा सकता है।
- यह समन्वयक कार्य को पुनरारंभ किए बिना पैरामीटर सर्वर विफलता से पुनर्प्राप्त करने के लिए समर्थित नहीं है।
-
tf.lookup.StaticHashTable
का उपयोग (जो आमतौर पर कुछ केरस प्रीप्रोसेसिंग परतों द्वारा नियोजित होता है, जैसे किtf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
, औरtf.keras.layers.TextVectorization
) पर संसाधनों में परिणाम होता है इस समय समन्वयक पैरामीटर सर्वर प्रशिक्षण के साथ। यह आरपीसी को देखने के लिए कामगारों से लेकर समन्वयक तक के प्रदर्शन पर प्रभाव डालता है। यह संबोधित करने के लिए एक वर्तमान उच्च प्राथमिकता है।
Model.fit
विशिष्टता
-
steps_per_epoch
मेंModel.fit
तर्क आवश्यक है। आप एक ऐसे मान का चयन कर सकते हैं जो किसी युग में उपयुक्त अंतराल प्रदान करता हो। -
ParameterServerStrategy
के पास कस्टम कॉलबैक के लिए समर्थन नहीं है जिसमें प्रदर्शन कारणों से बैच-स्तरीय कॉल हैं। आपको उन कॉलों को उपयुक्त रूप से चुने गएsteps_per_epoch
के साथ युग-स्तर की कॉलों में परिवर्तित करना चाहिए, ताकि उन्हें हरsteps_per_epoch
चरणों की संख्या कहा जा सके। बिल्ट-इन कॉलबैक प्रभावित नहीं होते हैं: उनके बैच-स्तरीय कॉलों को निष्पादक होने के लिए संशोधित किया गया है।ParameterServerStrategy
के लिए सहायक बैच-स्तरीय कॉल की योजना बनाई जा रही है। - इसी कारण से, अन्य रणनीतियों के विपरीत, प्रगति बार और मेट्रिक्स केवल युग की सीमाओं पर लॉग किए जाते हैं।
-
run_eagerly
समर्थित नहीं है।
कस्टम प्रशिक्षण लूप विवरण
-
ClusterCoordinator.schedule
किसी डेटासेट के लिए विज़िट की गारंटी का समर्थन नहीं करता है। - जब
ClusterCoordinator.create_per_worker_dataset
का उपयोग किया जाता है, तो पूरे डेटासेट को इसे पास किए गए फ़ंक्शन के अंदर बनाया जाना चाहिए। -
tf.data.Options
द्वारा बनाए गए डेटासेट मेंClusterCoordinator.create_per_worker_dataset
को अनदेखा किया जाता है।