Обучение сервера параметров с помощью ParameterServerStrategy

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

Обучение сервера параметров — это распространенный метод параллельного анализа данных для масштабирования обучения модели на нескольких машинах.

Учебный кластер сервера параметров состоит из рабочих процессов и серверов параметров . Переменные создаются на серверах параметров, и они считываются и обновляются работниками на каждом этапе. По умолчанию рабочие процессы читают и обновляют эти переменные независимо друг от друга, не синхронизируясь друг с другом. Вот почему иногда обучение параметров в стиле сервера называют асинхронным обучением .

В TensorFlow 2 обучение сервера параметров обеспечивается классом tf.distribute.experimental.ParameterServerStrategy , который распределяет шаги обучения по кластеру, который масштабируется до тысяч рабочих (в сопровождении серверов параметров).

Поддерживаемые методы обучения

Существует два основных поддерживаемых метода обучения:

Кластер с заданиями и задачами

Вне зависимости от выбранного API ( Model.fit или настраиваемый цикл обучения), распределенное обучение в TensorFlow 2 предполагает: 'cluster' с несколькими 'jobs' , причем у каждого из заданий может быть одна или несколько 'tasks' .

При использовании обучения сервера параметров рекомендуется иметь:

  • Одна должность координатора (имеющая название должности chief )
  • Несколько рабочих мест (название рабочего места worker ); и
  • Несколько заданий сервера параметров (имя задания ps )

В то время как координатор создает ресурсы, отправляет обучающие задачи, записывает контрольные точки и обрабатывает сбои задач, рабочие процессы и серверы параметров запускают tf.distribute.Server , который прослушивает запросы от координатора.

Обучение сервера параметров с помощью Model.fit API

Для обучения сервера параметров с помощью API Model.fit координатор должен использовать объект tf.distribute.experimental.ParameterServerStrategy и tf.keras.utils.experimental.DatasetCreator в качестве входных данных. Подобно использованию Model.fit без стратегии или с другими стратегиями, рабочий процесс включает создание и компиляцию модели, подготовку обратных вызовов, за которыми следует вызов Model.fit .

Обучение сервера параметров с пользовательским циклом обучения

В настраиваемых циклах обучения класс tf.distribute.experimental.coordinator.ClusterCoordinator является ключевым компонентом, используемым для координатора.

Самый важный API, предоставляемый объектом ClusterCoordinator , — это schedule :

  • API schedule ставит в очередь функцию tf.function и немедленно возвращает RemoteValue значение будущего.
  • Функции в очереди будут отправлены удаленным работникам в фоновых потоках, а их RemoteValue будут заполнены асинхронно.
  • Поскольку schedule не требует назначения рабочего процесса, переданная tf.function может быть выполнена на любом доступном рабочем процессе.
  • Если рабочий процесс, на котором он выполняется, становится недоступным до его завершения, функция будет повторно запущена на другом доступном рабочем потоке.
  • Из-за этого факта и того факта, что выполнение функции не является атомарным, функция может выполняться более одного раза.

В дополнение к диспетчеризации удаленных функций ClusterCoordinator также помогает создавать наборы данных для всех рабочих процессов и перестраивать эти наборы данных, когда рабочий процесс восстанавливается после сбоя.

Настройка учебника

Учебное пособие будет разветвлено на Model.fit и пользовательские пути цикла обучения, и вы сможете выбрать тот, который соответствует вашим потребностям. Разделы, отличные от «Обучение с X», применимы к обоим путям.

pip install portpicker

Настройка кластера

Как упоминалось выше, для обучающего кластера сервера параметров требуется задача-координатор, которая запускает вашу обучающую программу, одна или несколько рабочих задач и задач сервера параметров, которые запускают серверы TensorFlow — tf.distribute.Server — и, возможно, дополнительная задача оценки, которая запускает дополнительную оценку. (см. раздел оценки коляски ниже). К их установке предъявляются следующие требования:

  • Задача координатора должна знать адреса и порты всех остальных серверов TensorFlow, кроме оценщика.
  • Рабочие процессы и серверы параметров должны знать, какой порт им нужно прослушивать. Для простоты вы обычно можете передавать полную информацию о кластере при создании серверов TensorFlow для этих задач.
  • Задача оценщика не должна знать настройку обучающего кластера. Если это так, он не должен пытаться подключиться к обучающему кластеру.
  • Рабочие процессы и серверы параметров должны иметь типы задач "worker" и "ps" соответственно. Координатор должен использовать "chief" в качестве типа задачи по устаревшим причинам.

В этом руководстве вы создадите внутрипроцессный кластер, чтобы все обучение сервера параметров можно было запустить в Colab. В следующем разделе вы узнаете, как настроить настоящие кластеры .

Внутрипроцессный кластер

Вы начнете с создания нескольких серверов TensorFlow заранее и подключитесь к ним позже. Обратите внимание, что это только для целей демонстрации этого руководства, и в реальном обучении серверы будут запускаться на машинах "worker" и "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Внутрипроцессная настройка кластера часто используется при модульном тестировании, например здесь .

Другой вариант локального тестирования — запустить процессы на локальном компьютере — пример такого подхода см. в разделе Обучение нескольких рабочих с помощью Keras .

Создание экземпляра ParameterServerStrategy

Прежде чем погрузиться в обучающий код, давайте создадим экземпляр объекта ParameterServerStrategy . Обратите внимание, что это необходимо независимо от того, используете ли вы Model.fit или настраиваемый цикл обучения. Аргумент variable_partitioner будет объяснен в разделе Шардинг переменных .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Чтобы использовать графические процессоры для обучения, выделите графические процессоры, видимые для каждого рабочего процесса. ParameterServerStrategy будет использовать все доступные графические процессоры для каждого рабочего процесса с тем ограничением, что все рабочие процессы должны иметь одинаковое количество доступных графических процессоров.

Шардинг переменных

Разделение переменных означает разделение переменной на несколько меньших переменных, которые называются сегментами . Шардинг переменных может быть полезен для распределения сетевой нагрузки при доступе к этим сегментам. Также полезно распределить вычисление и хранение нормальной переменной между несколькими серверами параметров.

Чтобы включить сегментирование переменных, вы можете передать variable_partitioner при создании объекта ParameterServerStrategy . variable_partitioner будет вызываться каждый раз, когда создается переменная, и ожидается, что он вернет количество осколков по каждому измерению переменной. Предоставляются некоторые готовые variable_partitioner , такие как tf.distribute.experimental.partitioners.MinSizePartitioner . Рекомендуется использовать разделители на основе размера, такие как tf.distribute.experimental.partitioners.MinSizePartitioner , чтобы избежать разделения небольших переменных, что может негативно сказаться на скорости обучения модели.

Когда variable_partitioner передается и если вы создаете переменную непосредственно в strategy.scope() , она станет типом контейнера со свойством variables , которое обеспечивает доступ к списку осколков. В большинстве случаев этот контейнер будет автоматически преобразован в тензор путем объединения всех осколков. В результате его можно использовать как обычную переменную. С другой стороны, некоторые методы TensorFlow, такие как tf.nn.embedding_lookup , обеспечивают эффективную реализацию для этого типа контейнера, и в этих методах можно избежать автоматической конкатенации.

Дополнительные сведения см. в документации API tf.distribute.experimental.ParameterServerStrategy .

Тренировка с Model.fit

Keras предоставляет простой в использовании API для обучения через Model.fit , который обрабатывает цикл обучения под капотом, с гибкостью переопределяемого train_step и обратными вызовами, которые предоставляют такие функции, как сохранение контрольных точек или сохранение сводки для TensorBoard. С Model.fit тот же обучающий код можно использовать для других стратегий с простой заменой объекта стратегии.

Входные данные

Model.fit с обучением сервера параметров требует, чтобы входные данные были предоставлены в вызываемом объекте, который принимает один аргумент типа tf.distribute.InputContext и возвращает tf.data.Dataset . Затем создайте объект tf.keras.utils.experimental.DatasetCreator , который принимает такой callable , и необязательный объект tf.distribute.InputOptions через аргумент input_options .

Обратите внимание, что рекомендуется перемешивать и повторять данные с параметрами обучения сервера, а также указывать steps_per_epoch в вызове fit , чтобы библиотека знала границы эпох.

Дополнительные сведения об аргументе InputContext см. в руководстве по распределенному вводу .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Код в dataset_fn будет вызываться на устройстве ввода, которым обычно является ЦП, на каждой из рабочих машин.

Построение и компиляция модели

Теперь вы создадите tf.keras.Model — тривиальную модель tf.keras.models.Sequential для демонстрационных целей — с последующим вызовом Model.compile для включения компонентов, таких как оптимизатор, метрики или параметры, такие как steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Обратные звонки и обучение

Прежде чем вызывать model.fit для фактического обучения, давайте подготовим необходимые обратные вызовы для общих задач, таких как:

  • ModelCheckpoint : для сохранения веса модели.
  • BackupAndRestore : чтобы убедиться, что ход обучения автоматически резервируется и восстанавливается, если кластер становится недоступным (например, прерывание или вытеснение); или
  • TensorBoard : для сохранения отчетов о ходе выполнения в сводные файлы, которые визуализируются в инструменте TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Прямое использование с ClusterCoordinator (необязательно)

Даже если вы выберете путь обучения Model.fit , вы можете дополнительно создать экземпляр объекта tf.distribute.experimental.coordinator.ClusterCoordinator , чтобы запланировать другие функции, которые вы хотели бы выполнять на рабочих процессах. Дополнительную информацию и примеры см. в разделе « Тренировка с пользовательским циклом обучения ».

Обучение с пользовательским циклом обучения

Использование настраиваемых циклов обучения с tf.distribute.Strategy обеспечивает большую гибкость при определении циклов обучения. С ParameterServerStrategy , определенным выше (как strategy ), вы будете использовать tf.distribute.experimental.coordinator.ClusterCoordinator для отправки выполнения шагов обучения удаленным работникам.

Затем вы создадите модель, определите набор данных и ступенчатую функцию, как вы делали это в цикле обучения с другими tf.distribute.Strategy s. Вы можете найти более подробную информацию в учебнике Custom training with tf.distribute.Strategy .

Чтобы обеспечить эффективную предварительную выборку наборов данных, используйте рекомендуемые API-интерфейсы для создания распределенных наборов данных, упомянутые в разделе « Этапы обучения отправки удаленным работникам » ниже. Кроме того, обязательно вызовите Strategy.run внутри worker_fn , чтобы в полной мере использовать графические процессоры, выделенные для рабочих процессов. Остальные шаги одинаковы для обучения с использованием графических процессоров или без них.

Давайте создадим эти компоненты, выполнив следующие шаги:

Настройте данные

Во-первых, напишите функцию, которая создает набор данных, включающий логику предварительной обработки, реализованную слоями предварительной обработки Keras .

Вы создадите эти слои вне dataset_fn но примените преобразование внутри dataset_fn , так как вы dataset_fn в tf.function , который не позволяет создавать переменные внутри него.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Создайте примеры игрушек в наборе данных:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Затем создайте обучающий набор данных, завернутый в dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Построить модель

Далее создайте модель и другие объекты. Обязательно создайте все переменные в strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Давайте подтвердим, что использование FixedShardsPartitioner разделило все переменные на два сегмента, и каждый сегмент был назначен разным серверам параметров:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Определите этап обучения

В-третьих, создайте этап обучения, завернутый в tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

В приведенной выше функции шага обучения вызов Strategy.run и Strategy.reduce в step_fn может поддерживать несколько графических процессоров на одного рабочего. Если рабочим выделены графические процессоры, Strategy.run распределит наборы данных по нескольким репликам.

Отправка шагов обучения удаленным работникам

После того, как все вычисления определены ParameterServerStrategy , вы будете использовать класс tf.distribute.experimental.coordinator.ClusterCoordinator для создания ресурсов и распространения этапов обучения среди удаленных сотрудников.

Давайте сначала создадим объект ClusterCoordinator и передадим объект стратегии:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Затем создайте набор данных для каждого работника и итератор. В приведенном ниже per_worker_dataset_fn рекомендуется оборачивать dataset_fn в strategy.distribute_datasets_from_function , чтобы обеспечить беспрепятственную эффективную предварительную выборку на графические процессоры.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Последним шагом является распределение вычислений между удаленными работниками с помощью ClusterCoordinator.schedule :

  • Метод schedule ставит в очередь функцию tf.function и немедленно возвращает удаленное значение, похожее на RemoteValue . Функции в очереди будут отправляться удаленным работникам в фоновых потоках, а RemoteValue будет заполняться асинхронно.
  • Метод join ( ClusterCoordinator.join ) можно использовать для ожидания выполнения всех запланированных функций.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Вот как вы можете получить результат RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Как вариант, можно запустить все шаги и что-то делать, ожидая завершения:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Полный рабочий процесс обучения и обслуживания для этого конкретного примера см. в этом тесте .

Подробнее о создании набора данных

Набор данных в приведенном выше коде создается с помощью ClusterCoordinator.create_per_worker_dataset API). Он создает один набор данных для каждого работника и возвращает объект-контейнер. Вы можете вызвать для него метод iter , чтобы создать итератор для каждого рабочего. Итератор для каждого рабочего процесса содержит один итератор для каждого рабочего процесса, и соответствующий фрагмент рабочего процесса будет заменен во входном аргументе функции, переданной методу ClusterCoordinator.schedule , прежде чем функция будет выполнена на конкретном рабочем процессе.

В настоящее время метод ClusterCoordinator.schedule предполагает, что рабочие процессы эквивалентны, и, следовательно, предполагает, что наборы данных для разных рабочих процессов одинаковы, за исключением того, что они могут перемешиваться по-разному, если они содержат операцию Dataset.shuffle . Из-за этого также рекомендуется, чтобы наборы данных повторялись бесконечно, и вы планировали конечное количество шагов вместо того, чтобы полагаться на OutOfRangeError из набора данных.

Еще одно важное замечание: наборы данных tf.data не поддерживают неявную сериализацию и десериализацию за пределами задач. Поэтому важно создать весь набор данных внутри функции, переданной в ClusterCoordinator.create_per_worker_dataset .

Оценка

Существует несколько способов определения и запуска цикла оценки в распределенном обучении. Каждый из них имеет свои плюсы и минусы, как описано ниже. Метод встроенной оценки рекомендуется, если у вас нет предпочтений.

Встроенная оценка

В этом методе координатор чередует обучение и оценку, поэтому он называется встроенной оценкой .

Есть несколько преимуществ встроенной оценки. Например:

  • Он может поддерживать большие модели оценки и наборы данных оценки, которые не может удержать одна задача.
  • Результаты оценки могут быть использованы для принятия решений по обучению следующей эпохи.

Существует два способа реализации встроенной оценки: прямая оценка и распределенная оценка.

  • Прямая оценка : для небольших моделей и наборов данных для оценки координатор может запустить оценку непосредственно для распределенной модели с набором данных для оценки на координаторе:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Распределенная оценка . Для больших моделей или наборов данных, которые невозможно запустить непосредственно на координаторе, задача координатора может распределять задачи оценки среди рабочих с помощью методов ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Оценка коляски

Другой метод называется дополнительной оценкой , когда вы создаете специальную задачу оценщика, которая неоднократно считывает контрольные точки и запускает оценку последней контрольной точки. Это позволяет завершить программу обучения раньше, если вам не нужно менять цикл обучения на основе результатов оценки. Однако для запуска оценки требуется дополнительная задача оценщика и периодические контрольные точки. Ниже приведен возможный цикл оценки боковой машины:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Кластеры в реальном мире

В реальной производственной среде вы будете запускать все задачи в разных процессах на разных машинах. Самый простой способ настроить информацию о кластере для каждой задачи — установить переменные среды "TF_CONFIG" и использовать tf.distribute.cluster_resolver.TFConfigClusterResolver для разбора "TF_CONFIG" .

Общее описание переменных среды "TF_CONFIG" см. в учебном руководстве по распределенной среде.

Если вы начинаете свои обучающие задачи, используя Kubernetes или другие шаблоны конфигурации, весьма вероятно, что эти шаблоны уже установили для вас “TF_CONFIG" .

Установите переменную среды "TF_CONFIG"

Предположим, у вас есть 3 воркера и 2 сервера параметров, "TF_CONFIG" 1 может быть:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" оценщика может быть:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Часть "cluster" в приведенной выше "TF_CONFIG" для оценщика является необязательной.

Если вы используете один и тот же бинарник для всех задач

Если вы предпочитаете выполнять все эти задачи, используя один двоичный файл, вам нужно будет разрешить вашей программе ветвление на разные роли в самом начале:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Следующий код запускает сервер TensorFlow и ожидает:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Обработка сбоя задачи

Сбой работника

tf.distribute.experimental.coordinator.ClusterCoordinator или Model.fit обеспечивают встроенную отказоустойчивость при сбое рабочего процесса. После восстановления рабочего процесса ранее предоставленная функция набора данных (либо ClusterCoordinator.create_per_worker_dataset для пользовательского цикла обучения, либо tf.keras.utils.experimental.DatasetCreator для Model.fit ) будет вызываться для рабочих процессов для повторного создания наборов данных.

Сбой сервера параметров или координатора

Однако, когда координатор увидит ошибку сервера параметров, он немедленно вызовет ошибку UnavailableError или AbortedError . В этом случае вы можете перезапустить координатор. Сам координатор также может стать недоступным. Поэтому рекомендуется определенный инвентарь, чтобы не потерять тренировочный прогресс:

  • Для Model.fit следует использовать обратный вызов BackupAndRestore , который автоматически обрабатывает сохранение и восстановление хода выполнения. Пример см. в разделе « Обратные вызовы и обучение » выше.

  • Для пользовательского цикла обучения следует периодически проверять переменные модели и загружать переменные модели из контрольной точки, если таковая имеется, перед началом обучения. О прогрессе обучения можно приблизительно судить по optimizer.iterations , если оптимизатор имеет контрольную точку:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Получение RemoteValue

Получение RemoteValue гарантированно будет успешным, если функция выполнена успешно. Это связано с тем, что в настоящее время возвращаемое значение сразу же копируется в координатор после выполнения функции. Если во время копирования произойдет сбой рабочего процесса, функция будет повторно применена к другому доступному рабочему процессу. Поэтому, если вы хотите оптимизировать производительность, вы можете планировать функции без возвращаемого значения.

Отчет об ошибках

Как только координатор увидит ошибку, такую ​​как UnavailableError от серверов параметров, или другие ошибки приложения, такие как InvalidArgument от tf.debugging.check_numerics , он отменит все ожидающие и поставленные в очередь функции, прежде чем вызвать ошибку. Извлечение их соответствующих RemoteValue вызовет CancelledError .

После возникновения ошибки координатор не будет вызывать ту же ошибку или какую-либо ошибку из отмененных функций.

Улучшение производительности

Существует несколько возможных причин возникновения проблем с производительностью при обучении с помощью ParameterServerStrategy и ClusterResolver .

Одной из распространенных причин является несбалансированная нагрузка на серверы параметров, а некоторые сильно загруженные серверы параметров исчерпали свои возможности. Также может быть несколько основных причин. Вот несколько простых способов смягчить эту проблему:

  1. Разделите большие переменные модели, указав variable_partitioner при построении ParameterServerStrategy .
  2. По возможности избегайте создания переменной точки доступа, которая требуется для всех серверов параметров, за один шаг. Например, используйте постоянную скорость обучения или подкласс tf.keras.optimizers.schedules.LearningRateSchedule в оптимизаторах, поскольку поведение по умолчанию заключается в том, что скорость обучения становится переменной, помещаемой на определенный сервер параметров и запрашиваемой всеми другими серверами параметров на каждом этапе. .
  3. Перемешайте свои большие словари, прежде чем передавать их слоям предварительной обработки Keras.

Еще одна возможная причина проблем с производительностью — координатор. Ваша первая реализация schedule / join основана на Python и, следовательно, может иметь накладные расходы на многопоточность. Также задержка между координатором и работниками может быть большой. Если это так,

  • Для Model.fit вы можете установить для аргумента steps_per_execution , предоставленного в Model.compile , значение больше 1.

  • Для пользовательского цикла обучения вы можете упаковать несколько шагов в одну tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Поскольку библиотека оптимизирована, мы надеемся, что большинству пользователей в будущем не придется вручную упаковывать шаги.

Кроме того, небольшая хитрость для повышения производительности заключается в том, чтобы запланировать функции без возвращаемого значения, как описано выше в разделе обработки сбоя задачи.

Известные ограничения

Большинство известных ограничений уже рассмотрены в предыдущих разделах. В этом разделе приводится сводка.

Общие ParameterServerStrategy

  • os.environment["grpc_fail_fast"]="use_caller" необходим для каждой задачи, включая координатора, чтобы обеспечить правильную работу отказоустойчивости.
  • Синхронное обучение сервера параметров не поддерживается.
  • Обычно необходимо упаковать несколько шагов в одну функцию для достижения оптимальной производительности.
  • Не поддерживается загрузка сохраненной_модели через tf.saved_model.load , содержащую сегментированные переменные. Обратите внимание, что загрузка такой save_model с помощью TensorFlow Serving должна работать.
  • Не поддерживается загрузка контрольной точки, содержащей сегментированные переменные слота оптимизатора, в другое количество сегментов.
  • Восстановление после сбоя сервера параметров без перезапуска задачи координатора не поддерживается.
  • Использование tf.lookup.StaticHashTable (который обычно используется некоторыми уровнями предварительной обработки Keras, такими как tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup и tf.keras.layers.TextVectorization ) приводит к размещению ресурсов на координатор в это время с сервером параметров обучения. Это влияет на производительность при поиске RPC от рабочих к координатору. В настоящее время это является приоритетной задачей.

Model.fit

  • steps_per_epoch требуется аргумент Model.fit . Вы можете выбрать значение, которое обеспечивает соответствующие интервалы в эпохе.
  • ParameterServerStrategy не поддерживает пользовательские обратные вызовы, которые имеют вызовы на уровне пакетов по соображениям производительности. Вы должны преобразовать эти вызовы в вызовы уровня эпохи с соответствующим образом выбранным steps_per_epoch , чтобы они вызывались через число шагов steps_per_epoch . Встроенные обратные вызовы не затронуты: их вызовы на уровне пакетов были изменены, чтобы повысить производительность. Планируется поддержка вызовов пакетного уровня для ParameterServerStrategy .
  • По той же причине, в отличие от других стратегий, индикатор выполнения и метрики регистрируются только на границах эпох.
  • run_eagerly не поддерживается.

Индивидуальные особенности тренировочного цикла