ترحيل تدريب متعدد العاملين على وحدة المعالجة المركزية / وحدة معالجة الرسومات

عرض على TensorFlow.org تشغيل في Google Colab عرض المصدر على جيثب تحميل دفتر

يوضح هذا الدليل كيفية ترحيل سير العمل التدريبي الموزع متعدد العمال من TensorFlow 1 إلى TensorFlow 2.

لأداء تدريب متعدد العمال باستخدام وحدات المعالجة المركزية / وحدات معالجة الرسومات:

يثبت

ابدأ ببعض عمليات الاستيراد الضرورية ومجموعة بيانات بسيطة لأغراض توضيحية:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

ستحتاج إلى متغير بيئة التكوين 'TF_CONFIG' للتدريب على أجهزة متعددة في TensorFlow. استخدم 'TF_CONFIG' لتحديد عناوين 'cluster' و 'task' . (تعرف على المزيد في دليل Distributed_training .)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

استخدم عبارة del لإزالة المتغير (ولكن في التدريب الواقعي متعدد العاملين في TensorFlow 1 ، لن تضطر إلى القيام بذلك):

del os.environ['TF_CONFIG']

TensorFlow 1: تدريب موزع متعدد العمال باستخدام واجهات برمجة تطبيقات tf.estimator

يوضح مقتطف الكود التالي سير العمل الأساسي للتدريب متعدد العمال في TF1: ستستخدم tf.estimator.Estimator و tf.estimator.TrainSpec و tf.estimator.EvalSpec و tf.estimator.train_and_evaluate API للتوزيع التدريب:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.038075272, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.13630s
INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.061832994.
({'loss': 0.005215075, 'global_step': 3}, [])

TensorFlow 2: تدريب متعدد العمال مع استراتيجيات التوزيع

في TensorFlow 2 ، يتم إجراء التدريب الموزع عبر العديد من العاملين باستخدام وحدات المعالجة المركزية ووحدات معالجة الرسومات ووحدات المعالجة المركزية عبر tf.distribute.Strategy s.

يوضح المثال التالي كيفية استخدام استراتيجيتين من هذا القبيل: tf.distribute.experimental.ParameterServerStrategy و tf.distribute.MultiWorkerMirroredStrategy ، وكلاهما مصمم لتدريب CPU / GPU مع العديد من العمال.

تستخدم ParameterServerStrategy منسقًا ( 'chief' ) ، مما يجعلها أكثر ملاءمة للبيئة في دفتر Colab المحمول هذا. ستستخدم بعض الأدوات المساعدة هنا لإعداد العناصر الداعمة الأساسية لتجربة قابلة للتشغيل هنا: ستنشئ كتلة قيد التشغيل ، حيث تُستخدم الخيوط لمحاكاة خوادم المعلمات ( 'ps' ) والعاملين ( 'worker' ) . لمزيد من المعلومات حول تدريب خادم المعلمات ، ارجع إلى تدريب خادم المعلمة باستخدام البرنامج التعليمي ParameterServerStrategy .

في هذا المثال ، حدد أولاً متغير البيئة 'TF_CONFIG' باستخدام tf.distribute.cluster_resolver.TFConfigClusterResolver لتوفير معلومات الكتلة. إذا كنت تستخدم نظام إدارة الكتلة للتدريب الموزع الخاص بك ، فتحقق مما إذا كان يوفر 'TF_CONFIG' بالفعل ، وفي هذه الحالة لا تحتاج إلى تعيين متغير البيئة هذا بشكل صريح. (تعرف على المزيد في قسم إعداد قسم متغير البيئة 'TF_CONFIG' في التدريب الموزع باستخدام دليل TensorFlow .)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

بعد ذلك ، قم بإنشاء tf.distribute.Server s للعاملين وخوادم المعلمات واحدًا تلو الآخر:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

في التدريب الموزع في العالم الواقعي ، بدلاً من بدء تشغيل جميع tf.distribute.Server على المنسق ، ستستخدم أجهزة متعددة ، وستستخدم الأجهزة التي تم تعيينها كـ "worker" و "ps" (خوادم المعلمات) قم بتشغيل tf.distribute.Server . راجع قسم المجموعات في العالم الحقيقي في البرنامج التعليمي لتدريب خادم Parameter لمزيد من التفاصيل.

مع كل شيء جاهزًا ، قم بإنشاء كائن ParameterServerStrategy :

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

بمجرد إنشاء كائن إستراتيجية ، حدد النموذج والمحسن والمتغيرات الأخرى ، واستدع ملف Keras Model.compile داخل Strategy.scope API لتوزيع التدريب. (ارجع إلى مستندات Strategy.scope API لمزيد من المعلومات.)

إذا كنت تفضل تخصيص التدريب الخاص بك عن طريق ، على سبيل المثال ، تحديد التمريرات الأمامية والخلفية ، فراجع التدريب مع قسم حلقة تدريب مخصصة في البرنامج التعليمي لتدريب خادم Parameter لمزيد من التفاصيل.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114
2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
{'loss': 3.843122}

أدوات التقسيم ( tf.distribute.experimental.partitioners )

ParameterServerStrategy في TensorFlow 2 يدعم التقسيم المتغير ويقدم نفس أدوات التقسيم مثل TensorFlow 1 ، بأسماء أقل إرباكًا: - tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : مقسم يحافظ على الحد الأقصى للحجم) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : مقسم يخصص الحد الأدنى للحجم لكل جزء. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : مقسم يخصص عددًا ثابتًا من الأجزاء.

بدلاً من ذلك ، يمكنك استخدام كائن MultiWorkerMirroredStrategy :

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

يمكنك استبدال الإستراتيجية المستخدمة أعلاه بكائن MultiWorkerMirroredStrategy لإجراء تدريب باستخدام هذه الإستراتيجية.

كما هو الحال مع واجهات برمجة تطبيقات tf.estimator ، نظرًا لأن MultiWorkerMirroredStrategy هي استراتيجية متعددة العملاء ، فلا توجد طريقة سهلة لتشغيل التدريب الموزع في دفتر Colab المحمول هذا. لذلك ، يؤدي استبدال الكود أعلاه بهذه الإستراتيجية إلى تشغيل الأشياء محليًا. يوضح تدريب العمال المتعددين باستخدام Keras Model.fit / حلقة تدريب مخصصة كيفية تشغيل تدريب متعدد العمال مع إعداد متغير 'TF_CONFIG' ، مع عاملين على مضيف محلي في كولاب. من الناحية العملية ، يمكنك إنشاء عدة عمال على عناوين / منافذ IP خارجية ، واستخدام المتغير 'TF_CONFIG' لتحديد تكوين الكتلة لكل عامل.

الخطوات التالية

لمعرفة المزيد حول التدريب الموزع متعدد العاملين باستخدام tf.distribute.experimental.ParameterServerStrategy و tf.distribute.MultiWorkerMirroredStrategy في TensorFlow 2 ، ضع في اعتبارك الموارد التالية: