عرض على TensorFlow.org | تشغيل في Google Colab | عرض المصدر على جيثب | تحميل دفتر |
ملخص
يوضح هذا البرنامج التعليمي كيف يمكن استخدام tf.distribute.Strategy
لتوزيع تدريب متعدد العمال باستخدام tf.estimator
. إذا قمت بكتابة الكود الخاص بك باستخدام tf.estimator
، وكنت مهتمًا بالتوسع إلى ما هو أبعد من جهاز واحد بأداء عالٍ ، فهذا البرنامج التعليمي يناسبك.
قبل البدء ، يرجى قراءة دليل إستراتيجية التوزيع . يعد البرنامج التعليمي التدريبي متعدد GPU مناسبًا أيضًا ، لأن هذا البرنامج التعليمي يستخدم نفس النموذج.
يثبت
أولاً ، قم بإعداد TensorFlow والواردات الضرورية.
import tensorflow_datasets as tfds
import tensorflow as tf
import os, json
tf.compat.v1.disable_eager_execution()
وظيفة الإدخال
يستخدم هذا البرنامج التعليمي مجموعة بيانات MNIST من مجموعات بيانات TensorFlow . يشبه الكود هنا البرنامج التعليمي التدريبي متعدد وحدات معالجة الرسومات مع اختلاف رئيسي واحد: عند استخدام Estimator لتدريب متعدد العمال ، من الضروري تقسيم مجموعة البيانات حسب عدد العمال لضمان تقارب النموذج. يتم تقسيم بيانات الإدخال بواسطة فهرس العامل ، بحيث يعالج كل عامل 1/num_workers
أجزاء مميزة من مجموعة البيانات.
BUFFER_SIZE = 10000
BATCH_SIZE = 64
def input_fn(mode, input_context=None):
datasets, info = tfds.load(name='mnist',
with_info=True,
as_supervised=True)
mnist_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label
if input_context:
mnist_dataset = mnist_dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
return mnist_dataset.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
هناك طريقة أخرى معقولة لتحقيق التقارب وهي خلط مجموعة البيانات ببذور مميزة لكل عامل.
تكوين متعدد العمال
أحد الاختلافات الرئيسية في هذا البرنامج التعليمي (مقارنة بالبرنامج التعليمي التدريبي متعدد وحدات معالجة الرسومات ) هو الإعداد متعدد العمال. متغير البيئة TF_CONFIG
هو الطريقة القياسية لتحديد تكوين الكتلة لكل عامل يمثل جزءًا من الكتلة.
هناك نوعان من مكونات TF_CONFIG
: cluster
task
. يوفر cluster
معلومات حول الكتلة بأكملها ، أي العمال وخوادم المعلمات في الكتلة. task
توفر معلومات حول المهمة الحالية. cluster
المكونات الأولى هي نفسها لجميع العاملين وخوادم المعلمات في المجموعة ، وتختلف task
المكون الثاني على كل عامل وخادم معلمة وتحدد type
index
الخاصين به. في هذا المثال ، type
المهمة worker
index
المهمة هو 0
.
لأغراض التوضيح ، يوضح هذا البرنامج التعليمي كيفية تعيين TF_CONFIG
مع عاملين على localhost
المحلي. من الناحية العملية ، يمكنك إنشاء عدة عمال على عنوان IP خارجي ومنفذ ، وتعيين TF_CONFIG
على كل عامل بشكل مناسب ، أي تعديل index
المهام.
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:12345", "localhost:23456"]
},
'task': {'type': 'worker', 'index': 0}
})
حدد النموذج
اكتب الطبقات والمحسن ووظيفة الخسارة للتدريب. يحدد هذا البرنامج التعليمي النموذج باستخدام طبقات Keras ، على غرار البرنامج التعليمي للتدريب على وحدات معالجة الرسومات المتعددة .
LEARNING_RATE = 1e-4
def model_fn(features, labels, mode):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features, training=False)
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {'logits': logits}
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
optimizer = tf.compat.v1.train.GradientDescentOptimizer(
learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.keras.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(
loss, tf.compat.v1.train.get_or_create_global_step()))
MultiWorkerMirroredStrategy
لتدريب النموذج ، استخدم مثيل tf.distribute.experimental.MultiWorkerMirroredStrategy
. تُنشئ MultiWorkerMirroredStrategy
نسخًا من جميع المتغيرات في طبقات النموذج على كل جهاز عبر جميع العاملين. يستخدم CollectiveOps
، عملية TensorFlow للاتصال الجماعي ، لتجميع التدرجات اللونية والحفاظ على تزامن المتغيرات. يحتوي دليل tf.distribute.Strategy
على مزيد من التفاصيل حول هذه الإستراتيجية.
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
WARNING:tensorflow:From /tmp/ipykernel_7505/349189047.py:1: _CollectiveAllReduceStrategyExperimental.__init__ (from tensorflow.python.distribute.collective_all_reduce_strategy) is deprecated and will be removed in a future version. Instructions for updating: use distribute.MultiWorkerMirroredStrategy instead INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO
تدريب وتقييم النموذج
بعد ذلك ، حدد استراتيجية التوزيع في RunConfig
، وقم بالتدريب والتقييم عن طريق استدعاء tf.estimator.train_and_evaluate
. يوزع هذا البرنامج التعليمي التدريب فقط عن طريق تحديد الإستراتيجية عبر train_distribute
. من الممكن أيضًا توزيع التقييم عبر eval_distribute
.
config = tf.estimator.RunConfig(train_distribute=strategy)
classifier = tf.estimator.Estimator(
model_fn=model_fn, model_dir='/tmp/multiworker', config=config)
tf.estimator.train_and_evaluate(
classifier,
train_spec=tf.estimator.TrainSpec(input_fn=input_fn),
eval_spec=tf.estimator.EvalSpec(input_fn=input_fn)
)
INFO:tensorflow:Initializing RunConfig with distribution strategies. INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Using config: {'_model_dir': '/tmp/multiworker', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } , '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': <tensorflow.python.distribute.collective_all_reduce_strategy._CollectiveAllReduceStrategyExperimental object at 0x7f3404234490>, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1, '_distribute_coordinator_mode': None} INFO:tensorflow:Not using Distribute Coordinator. INFO:tensorflow:Running training and evaluation locally (non-distributed). INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/estimator.py:1244: StrategyBase.configure (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: use `update_config_proto` instead. INFO:tensorflow:The `input_fn` accepts an `input_context` which will be given by DistributionStrategy INFO:tensorflow:Calling model_fn. /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:449: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options. warnings.warn("To make it possible to preserve tf.data options across " INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Create CheckpointSaverHook. INFO:tensorflow:Create CheckpointSaverHook. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow_estimator/python/estimator/util.py:95: DistributedIteratorV1.initialize (from tensorflow.python.distribute.v1.input_lib) is deprecated and will be removed in a future version. Instructions for updating: Use the iterator's `initializer` property instead. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0... INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 0 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0... 2022-01-26 05:29:43.503603: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorFromStringHandle' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorFromStringHandle} } . Registered: device='CPU' 2022-01-26 05:29:43.504873: W tensorflow/core/grappler/utils/graph_view.cc:836] No registered 'MultiDeviceIteratorGetNextFromShard' OpKernel for GPU devices compatible with node { {node MultiDeviceIteratorGetNextFromShard} } . Registered: device='CPU' INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:loss = 2.292878, step = 0 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:global_step/sec: 173.275 INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:loss = 2.29561, step = 100 (0.579 sec) INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:global_step/sec: 189.057 INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:loss = 2.2644367, step = 200 (0.529 sec) INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:global_step/sec: 193.075 INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:loss = 2.2662685, step = 300 (0.517 sec) INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:global_step/sec: 199.957 INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:loss = 2.2667098, step = 400 (0.500 sec) INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:global_step/sec: 204.217 INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:loss = 2.251912, step = 500 (0.490 sec) INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:global_step/sec: 201.747 INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:loss = 2.2633677, step = 600 (0.496 sec) INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:global_step/sec: 206.079 INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:loss = 2.2531767, step = 700 (0.485 sec) INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:global_step/sec: 231.299 INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:loss = 2.2578738, step = 800 (0.433 sec) INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:global_step/sec: 657.044 INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:loss = 2.2344787, step = 900 (0.150 sec) INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 938... INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Saving checkpoints for 938 into /tmp/multiworker/model.ckpt. INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 938... INFO:tensorflow:Calling model_fn. INFO:tensorflow:Calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Done calling model_fn. INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Starting evaluation at 2022-01-26T05:29:56 INFO:tensorflow:Graph was finalized. INFO:tensorflow:Graph was finalized. INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Restoring parameters from /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Running local_init_op. INFO:tensorflow:Running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Done running local_init_op. INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [10/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [20/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [30/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [40/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [50/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [60/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [70/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [80/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [90/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Evaluation [100/100] INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Inference Time : 2.04637s INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Finished evaluation at 2022-01-26-05:29:58 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving dict for global step 938: global_step = 938, loss = 2.234131 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Saving 'checkpoint_path' summary for global step 938: /tmp/multiworker/model.ckpt-938 INFO:tensorflow:Loss for final step: 1.10881. INFO:tensorflow:Loss for final step: 1.10881. ({'loss': 2.234131, 'global_step': 938}, [])
تحسين أداء التدريب
لديك الآن نموذج ومقدر قادر على تعدد العمال مدعوم من tf.distribute.Strategy
. يمكنك تجربة الأساليب التالية لتحسين أداء التدريب متعدد العمال:
- زيادة حجم الدُفعة: حجم الدُفعة المحدد هنا هو لكل وحدة معالجة رسومات. بشكل عام ، يُنصح باستخدام أكبر حجم للدفعة يناسب ذاكرة وحدة معالجة الرسومات.
- متغيرات Cast: صب المتغيرات على
tf.float
إن أمكن. يتضمن نموذج ResNet الرسمي مثالاً على كيفية القيام بذلك. استخدام الاتصال الجماعي: توفر
MultiWorkerMirroredStrategy
العديد من تطبيقات الاتصال الجماعي .- تقوم
RING
بتنفيذ مجموعات قائمة على الحلقات باستخدام gRPC كطبقة اتصال عبر المضيف. - يستخدم
NCCL
NCCL الخاص بـ Nvidia لتنفيذ المجموعات. -
AUTO
يؤجل الاختيار لوقت التشغيل.
يعتمد أفضل خيار للتنفيذ الجماعي على عدد وحدات معالجة الرسومات ونوعها والتوصيل البيني للشبكة في المجموعة. لتجاوز الاختيار التلقائي ، حدد قيمة صالحة لمعامل
communication
الخاصMultiWorkerMirroredStrategy
، على سبيل المثال ،communication=tf.distribute.experimental.CollectiveCommunication.NCCL
.- تقوم
قم بزيارة قسم الأداء في الدليل لمعرفة المزيد حول الاستراتيجيات والأدوات الأخرى التي يمكنك استخدامها لتحسين أداء نماذج TensorFlow.
أمثلة التعليمات البرمجية الأخرى
- مثال نهائي لتدريب العديد من العمال في نظام Tensorflow / النظام البيئي باستخدام قوالب Kubernetes. يبدأ هذا المثال بنموذج Keras ويحوله إلى مقدر باستخدام
tf.keras.estimator.model_to_estimator
API. - النماذج الرسمية ، يمكن تكوين العديد منها لتشغيل إستراتيجيات توزيع متعددة.