حلقه آموزش سفارشی با Keras و MultiWorkerMirroredStrategy

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

بررسی اجمالی

این آموزش آموزش چند کارگری را با API حلقه آموزشی سفارشی نشان می‌دهد که از طریق MultiWorkerMirroredStrategy توزیع شده است، بنابراین یک مدل Keras که برای اجرا روی تک‌کارگر طراحی شده است می‌تواند به طور یکپارچه روی چندین کارگر با حداقل تغییر کد کار کند.

ما از حلقه‌های آموزشی سفارشی برای آموزش مدل خود استفاده می‌کنیم، زیرا آنها انعطاف‌پذیری و کنترل بیشتری در آموزش به ما می‌دهند. علاوه بر این، اشکال زدایی مدل و حلقه آموزشی آسان تر است. اطلاعات دقیق تر در نوشتن یک حلقه آموزشی از ابتدا موجود است.

اگر به دنبال نحوه استفاده از MultiWorkerMirroredStrategy با keras model.fit ، به جای آن به این آموزش مراجعه کنید.

راهنمای آموزش توزیع‌شده در TensorFlow برای مروری بر استراتژی‌های توزیعی که TensorFlow از آن‌ها پشتیبانی می‌کند برای کسانی که علاقه‌مند به درک عمیق‌تر از tf.distribute.Strategy APIها هستند، در دسترس است.

برپایی

ابتدا مقداری واردات ضروری.

import json
import os
import sys

قبل از وارد کردن TensorFlow، چند تغییر در محیط ایجاد کنید.

تمام پردازنده های گرافیکی را غیر فعال کنید. این از خطاهای ناشی از تلاش کارگران برای استفاده از یک GPU جلوگیری می کند. برای یک کاربرد واقعی، هر کارگر روی یک ماشین متفاوت خواهد بود.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

متغیر محیطی TF_CONFIG را بازنشانی کنید، بعداً در این مورد بیشتر خواهید دید.

os.environ.pop('TF_CONFIG', None)

مطمئن شوید که دایرکتوری فعلی در مسیر پایتون است. این به نوت بوک اجازه می دهد تا فایل های نوشته شده توسط %%writefile بعدا وارد کند.

if '.' not in sys.path:
  sys.path.insert(0, '.')

اکنون TensorFlow را وارد کنید.

import tensorflow as tf

مجموعه داده و تعریف مدل

سپس یک فایل mnist.py با یک مدل ساده و تنظیمات مجموعه ایجاد کنید. این فایل پایتون توسط worker-processes در این آموزش استفاده خواهد شد:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

پیکربندی چند کارگری

حالا بیایید وارد دنیای آموزش چندکاره شویم. در TensorFlow، متغیر محیطی TF_CONFIG برای آموزش روی چندین ماشین مورد نیاز است، که احتمالاً هر کدام نقش متفاوتی دارند. TF_CONFIG که در زیر استفاده می شود، یک رشته JSON است که برای تعیین پیکربندی خوشه در هر کارگر که بخشی از خوشه است استفاده می شود. این روش پیش‌فرض برای تعیین یک خوشه با استفاده از cluster_resolver.TFConfigClusterResolver است، اما گزینه‌های دیگری در ماژول distribute.cluster_resolver وجود دارد.

خوشه خود را توصیف کنید

در اینجا یک نمونه پیکربندی است:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

در اینجا همان TF_CONFIG است که به عنوان یک رشته JSON سریال شده است:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

دو جزء TF_CONFIG وجود دارد: cluster و task .

  • cluster برای همه کارگران یکسان است و اطلاعاتی در مورد خوشه آموزشی ارائه می دهد که دستوری است از انواع مشاغل مانند worker . در آموزش چند کارگری با MultiWorkerMirroredStrategy ، معمولاً یک worker وجود دارد که علاوه بر کاری که یک worker معمولی انجام می دهد، کمی مسئولیت بیشتری مانند ذخیره ایست بازرسی و نوشتن فایل خلاصه برای TensorBoard بر عهده می گیرد. از چنین کارگری به عنوان کارگر chief یاد می شود و مرسوم است که worker با index 0 به عنوان worker اصلی منصوب می شود (در واقع به این ترتیب tf.distribute.Strategy اجرا می شود).

  • task اطلاعات وظیفه فعلی را ارائه می دهد و برای هر کارگر متفاوت است. type و index آن کارگر را مشخص می کند.

در این مثال، type وظیفه را روی "worker" و index وظیفه را 0 تنظیم می کنید. این دستگاه اولین کارگر است و به عنوان سرکار منصوب می شود و بیشتر از بقیه کار می کند. توجه داشته باشید که ماشین‌های دیگر باید مجموعه متغیر محیطی TF_CONFIG را نیز داشته باشند، و باید دارای دیکت cluster یکسان باشد، اما بسته به نقش آن ماشین‌ها، type کار یا index کار متفاوت است.

برای اهداف تصویری، این آموزش نشان می دهد که چگونه می توان یک TF_CONFIG با 2 کارگر در localhost هاست تنظیم کرد. در عمل، کاربران چندین کارگر را روی آدرس‌ها/پورت‌های IP خارجی ایجاد می‌کنند و TF_CONFIG را روی هر کارگر به طور مناسب تنظیم می‌کنند.

در این مثال شما از 2 کارگر استفاده خواهید کرد، TF_CONFIG اولین کارگر در بالا نشان داده شده است. برای کارگر دوم tf_config['task']['index']=1 تنظیم کنید

در بالا، tf_config فقط یک متغیر محلی در پایتون است. برای استفاده واقعی از آن برای پیکربندی آموزش، این فرهنگ لغت باید به صورت JSON سریال شود و در متغیر محیطی TF_CONFIG قرار گیرد.

متغیرهای محیطی و فرآیندهای فرعی در نوت بوک ها

فرآیندهای فرعی متغیرهای محیطی را از والد خود به ارث می برند. بنابراین اگر یک متغیر محیطی را در این فرآیند jupyter notebook کنید:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

شما می توانید از یک زیر فرآیند به متغیر محیطی دسترسی پیدا کنید:

echo ${GREETINGS}
Hello TensorFlow!

در بخش بعدی، از این برای ارسال TF_CONFIG به زیرفرایندهای کارگر استفاده خواهید کرد. شما هرگز واقعاً مشاغل خود را به این روش راه اندازی نمی کنید، اما برای اهداف این آموزش کافی است: برای نشان دادن یک نمونه حداقل چند کارگری.

MultiWorkerMirroredStrategy

برای آموزش مدل، از نمونه‌ای از tf.distribute.MultiWorkerMirroredStrategy استفاده کنید، که کپی‌هایی از همه متغیرها در لایه‌های مدل در هر دستگاه در همه کارگران ایجاد می‌کند. راهنمای tf.distribute.Strategy جزئیات بیشتری در مورد این استراتژی دارد.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

از tf.distribute.Strategy.scope استفاده کنید تا مشخص کنید که هنگام ساخت مدل شما باید از یک استراتژی استفاده شود. این شما را در " زمینه تکرار متقابل " برای این استراتژی قرار می دهد، به این معنی که استراتژی کنترل چیزهایی مانند قرار دادن متغیرها را در اختیار دارد.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

به طور خودکار داده های خود را بین کارگران تقسیم کنید

در آموزش چند کارگری، به اشتراک گذاری مجموعه داده لزوماً نیازی نیست، با این حال معنایی دقیقاً یک بار به شما می دهد که باعث می شود آموزش بیشتر تکرار شود، یعنی آموزش روی چند کارگر باید مانند آموزش روی یک کارگر باشد. توجه: عملکرد ممکن است در برخی موارد تحت تأثیر قرار گیرد.

ببینید: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

حلقه آموزش سفارشی را تعریف کنید و مدل را آموزش دهید

یک بهینه ساز را مشخص کنید

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

یک مرحله آموزشی با tf.function تعریف کنید

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

ذخیره و بازیابی ایست بازرسی

اجرای Checkpointing در یک حلقه آموزشی سفارشی به کاربر نیاز دارد که به جای استفاده از پاسخ به تماس keras، آن را مدیریت کند. این به شما امکان می دهد وزن های مدل را ذخیره کنید و آنها را بدون نیاز به ذخیره کل مدل بازیابی کنید.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

در اینجا، یک tf.train.Checkpoint ایجاد می‌کنید که مدل را ردیابی می‌کند، که توسط tf.train.CheckpointManager مدیریت می‌شود تا فقط آخرین پست بازرسی حفظ شود.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

اکنون، زمانی که نیاز به بازیابی دارید، می‌توانید آخرین پست بازرسی ذخیره شده را با استفاده از عملکرد راحت tf.train.latest_checkpoint کنید.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

پس از بازگرداندن چک پوینت، می توانید به آموزش حلقه آموزشی سفارشی خود ادامه دهید.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

راه اندازی کد کامل روی کارگران

برای اجرای واقعی با MultiWorkerMirroredStrategy ، باید فرآیندهای کارگر را اجرا کنید و یک TF_CONFIG به آنها ارسال کنید.

مانند فایل mnist.py که قبلا نوشته شده است، در اینجا main.py است که حاوی همان کدی است که قبلاً در این colab قدم به قدم آن را طی کردیم، ما فقط آن را در یک فایل می نویسیم تا هر یک از کارگران آن را اجرا کنند:

فایل: main.py

Writing main.py

آموزش دهید و ارزیابی کنید

فهرست فعلی شامل هر دو فایل پایتون است:

ls *.py
main.py
mnist.py

بنابراین TF_CONFIG را json سریال کنید و آن را به متغیرهای محیط اضافه کنید:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

اکنون، می توانید یک فرآیند worker را راه اندازی کنید که main.py را اجرا می کند و از main.py استفاده می TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

در مورد دستور بالا باید به چند نکته توجه کرد:

  1. از %%bash که یک نوت بوک "جادویی" است برای اجرای برخی از دستورات bash استفاده می کند.
  2. از پرچم --bg برای اجرای فرآیند bash در پس‌زمینه استفاده می‌کند، زیرا این کارگر خاتمه نمی‌یابد. قبل از شروع کار منتظر همه کارگران است.

فرآیند کارگر پس‌زمینه خروجی را در این نوت‌بوک چاپ نمی‌کند، بنابراین &> خروجی خود را به یک فایل هدایت می‌کند، بنابراین می‌توانید ببینید چه اتفاقی افتاده است.

بنابراین، چند ثانیه صبر کنید تا فرآیند شروع شود:

import time
time.sleep(20)

حالا ببینید چه چیزی تا کنون به فایل لاگ کارگر خروجی داده شده است:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

خط آخر فایل لاگ باید بگوید: Started server with target: grpc://localhost:12345 . اولین کارگر اکنون آماده است و منتظر است تا همه کارگران دیگر آماده ادامه کار باشند.

بنابراین tf_config را برای پردازش دومین کارگر به‌روزرسانی کنید:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

حالا کارگر دوم را راه اندازی کنید. این آموزش را آغاز می کند زیرا همه کارگران فعال هستند (بنابراین نیازی به پیشینه این فرآیند نیست):

python main.py > /dev/null 2>&1

حال اگر گزارش های نوشته شده توسط اولین کارگر را دوباره بررسی کنید، خواهید دید که در آموزش آن مدل شرکت کرده است:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

آموزش عمیق چند کارگری

این آموزش یک گردش کار Custom Training Loop از راه اندازی چند کارگری را نشان می دهد. شرح مفصلی از موضوعات دیگر در model.fit's guide مورد راه اندازی چندکاره موجود است و برای CTL ها قابل اجرا است.

همچنین ببینید

  1. راهنمای آموزش توزیع شده در TensorFlow نمای کلی از استراتژی های توزیع موجود را ارائه می دهد.
  2. مدل‌های رسمی ، که بسیاری از آنها را می‌توان برای اجرای چندین استراتژی توزیع پیکربندی کرد.
  3. بخش Performance در راهنما اطلاعاتی در مورد سایر استراتژی ها و ابزارهایی که می توانید برای بهینه سازی عملکرد مدل های TensorFlow خود استفاده کنید، ارائه می دهد.