Пользовательский цикл обучения с Keras и MultiWorkerMirroredStrategy

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

В этом руководстве демонстрируется обучение нескольких рабочих с помощью API пользовательского цикла обучения, распространяемого через MultiWorkerMirroredStrategy, поэтому модель Keras, предназначенная для работы с одним рабочим, может беспрепятственно работать на нескольких рабочих с минимальным изменением кода.

Мы используем настраиваемые циклы обучения для обучения нашей модели, потому что они дают нам гибкость и больший контроль над обучением. Кроме того, легче отладить модель и цикл обучения. Более подробная информация доступна в разделе Написание обучающего цикла с нуля .

Если вы ищете, как использовать MultiWorkerMirroredStrategy с keras model.fit , обратитесь к этому руководству .

Руководство по распределенному обучению TensorFlow доступно для обзора стратегий распространения, поддерживаемых TensorFlow, для тех, кто заинтересован в более глубоком понимании API tf.distribute.Strategy .

Настраивать

Во-первых, необходимый импорт.

import json
import os
import sys

Перед импортом TensorFlow внесите несколько изменений в среду.

Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие процессы пытаются использовать один и тот же графический процессор. Для реального приложения каждый работник будет на другой машине.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Сбросьте переменную окружения TF_CONFIG , об этом вы узнаете позже.

os.environ.pop('TF_CONFIG', None)

Убедитесь, что текущий каталог находится на пути Python. Это позволяет блокноту позже импортировать файлы, записанные с помощью %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Теперь импортируйте TensorFlow.

import tensorflow as tf

Набор данных и определение модели

Затем создайте файл mnist.py с простой настройкой модели и набора данных. Этот файл python будет использоваться рабочими процессами в этом руководстве:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Многопользовательская конфигурация

Теперь давайте войдем в мир обучения нескольких сотрудников. В TF_CONFIG переменная среды TF_CONFIG необходима для обучения на нескольких машинах, каждая из которых может иметь разную роль. TF_CONFIG , используемый ниже, представляет собой строку JSON, используемую для указания конфигурации кластера для каждого рабочего процесса, который является частью кластера. Это метод по умолчанию для указания кластера с использованием cluster_resolver.TFConfigClusterResolver , но в модуле distribute.cluster_resolver доступны и другие параметры.

Опишите свой кластер

Вот пример конфигурации:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Вот тот же TF_CONFIG , сериализованный как строка JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Есть два компонента TF_CONFIG : cluster и task .

  • cluster одинаков для всех рабочих и предоставляет информацию об обучающем кластере, который представляет собой набор, состоящий из различных типов заданий, таких как worker . При обучении нескольких рабочих с помощью MultiWorkerMirroredStrategy обычно есть один worker , который берет на себя немного больше обязанностей, таких как сохранение контрольной точки и написание сводного файла для TensorBoard, в дополнение к тому, что делает обычный worker . Такой воркер называется chief воркером, и принято, что worker с index 0 назначается главным worker (собственно так и реализован tf.distribute.Strategy ).

  • task предоставляет информацию о текущей задаче и отличается для каждого работника. Он указывает type и index этого работника.

В этом примере вы устанавливаете type задачи "worker" , а index задачи — 0 . Эта машина является первым рабочим и будет назначена главным рабочим и будет выполнять больше работы, чем другие. Обратите внимание, что на других машинах также должна быть установлена ​​переменная среды TF_CONFIG , и она должна иметь тот же cluster словарь, но другой type задачи или index задачи в зависимости от ролей этих машин.

В целях иллюстрации в этом руководстве показано, как можно установить TF_CONFIG с двумя рабочими процессами на localhost хосте. На практике пользователи должны создавать несколько рабочих процессов на внешних IP-адресах/портах и ​​соответствующим образом устанавливать TF_CONFIG для каждого рабочего процесса.

В этом примере вы будете использовать 2 воркера, TF_CONFIG первого TF_CONFIG показан выше. Для второго рабочего вы должны установить tf_config['task']['index']=1

Выше tf_config — это просто локальная переменная в python. Чтобы фактически использовать его для настройки обучения, этот словарь необходимо сериализовать как JSON и поместить в переменную среды TF_CONFIG .

Переменные среды и подпроцессы в блокнотах

Подпроцессы наследуют переменные среды от своего родителя. Итак, если вы установите переменную среды в этом jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Вы можете получить доступ к переменной среды из подпроцессов:

echo ${GREETINGS}
Hello TensorFlow!

В следующем разделе вы будете использовать это для передачи TF_CONFIG рабочим подпроцессам. На самом деле вы никогда не запустите свои задания таким образом, но этого достаточно для целей этого руководства: продемонстрировать минимальный пример с несколькими рабочими.

MultiWorkerMirroredСтратегии

Для обучения модели используйте экземпляр tf.distribute.MultiWorkerMirroredStrategy , который создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих процессов. Руководство tf.distribute.Strategy содержит более подробную информацию об этой стратегии.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Используйте tf.distribute.Strategy.scope , чтобы указать, что стратегия должна использоваться при построении вашей модели. Это помещает вас в « контекст между репликами » для этой стратегии, что означает, что стратегия получает контроль над такими вещами, как размещение переменных.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Автоматическое разделение ваших данных между работниками

При обучении нескольких рабочих сегментирование набора данных не обязательно требуется, однако оно дает вам семантику ровно один раз, что делает дальнейшее обучение более воспроизводимым, т. е. обучение на нескольких рабочих должно быть таким же, как обучение на одном рабочем. Примечание. В некоторых случаях производительность может быть снижена.

См.: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Определите пользовательский цикл обучения и обучите модель

Укажите оптимизатор

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Определите шаг обучения с помощью tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Сохранение и восстановление контрольной точки

Реализация контрольных точек в пользовательском цикле обучения требует, чтобы пользователь обрабатывал ее вместо использования обратного вызова keras. Это позволяет вам сохранять веса модели и восстанавливать их, не сохраняя всю модель.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Здесь вы создадите одну tf.train.Checkpoint , которая отслеживает модель, которой управляет tf.train.CheckpointManager , так что сохраняется только последняя контрольная точка.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Теперь, когда вам нужно восстановить, вы можете найти последнюю сохраненную контрольную точку с помощью удобной функции tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

После восстановления контрольной точки вы можете продолжить обучение индивидуальному циклу обучения.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Полная настройка кода на воркерах

Чтобы на самом деле работать с MultiWorkerMirroredStrategy , вам нужно запустить рабочие процессы и передать им TF_CONFIG .

Как и файл mnist.py , написанный ранее, вот main.py , который содержит тот же код, который мы шаг за шагом проходили ранее в этом совместном проекте, мы просто записываем его в файл, чтобы каждый из рабочих выполнял его:

Файл: main.py

Writing main.py

Тренируйтесь и оценивайте

Текущий каталог теперь содержит оба файла Python:

ls *.py
main.py
mnist.py

Итак, json-сериализуйте TF_CONFIG и добавьте его в переменные среды:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь вы можете запустить рабочий процесс, который будет запускать main.py и использовать TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Есть несколько замечаний по поводу приведенной выше команды:

  1. Он использует %%bash , который является «волшебством» ноутбука для запуска некоторых команд bash.
  2. Он использует флаг --bg для запуска процесса bash в фоновом режиме, потому что этот рабочий процесс не завершится. Он ждет всех рабочих перед тем, как начать.

Фоновый рабочий процесс не будет печатать вывод в этот блокнот, поэтому &> перенаправляет вывод в файл, чтобы вы могли видеть, что произошло.

Итак, подождите несколько секунд, пока процесс запустится:

import time
time.sleep(20)

Теперь посмотрите, что было выведено в файл журнала рабочего процесса:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

В последней строке файла журнала должно быть написано: Started server with target: grpc://localhost:12345 . Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить работу.

Итак, обновите tf_config для второго рабочего процесса:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Теперь запустите второго рабочего. Это запустит обучение, так как все рабочие активны (поэтому нет необходимости фонировать этот процесс):

python main.py > /dev/null 2>&1

Теперь, если вы перепроверите журналы, написанные первым рабочим, вы увидите, что он участвовал в обучении этой модели:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Углубленное обучение нескольких сотрудников

В этом руководстве продемонстрирован рабочий процесс Custom Training Loop для настройки с несколькими работниками. Подробное описание других тем доступно в руководстве model.fit's guide настройке нескольких рабочих и применимо к CTL.

Смотрите также

  1. В руководстве по распределенному обучению TensorFlow представлен обзор доступных стратегий распространения.
  2. Официальные модели , многие из которых можно настроить для запуска нескольких стратегий распространения.
  3. Раздел « Производительность» в руководстве содержит информацию о других стратегиях и инструментах , которые вы можете использовать для оптимизации производительности ваших моделей TensorFlow.