Распределенное обучение с TensorFlow

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

Обзор

tf.distribute.Strategy — это API TensorFlow для распределения обучения между несколькими графическими процессорами, несколькими машинами или TPU. Используя этот API, вы можете распространять существующие модели и обучающий код с минимальными изменениями кода.

tf.distribute.Strategy был разработан с учетом следующих ключевых целей:

  • Простота в использовании и поддержка нескольких сегментов пользователей, включая исследователей, инженеров по машинному обучению и т. д.
  • Обеспечьте хорошую производительность из коробки.
  • Легкое переключение между стратегиями.

Вы можете распространять обучение с помощью tf.distribute.Strategy с помощью высокоуровневого API, такого как Model.fit , а также настраиваемые циклы обучения (и вообще любые вычисления с использованием TensorFlow).

В TensorFlow 2.x вы можете выполнять свои программы быстро или в графе, используя tf.function . tf.distribute.Strategy поддерживает оба этих режима выполнения, но лучше всего работает с tf.function . Режим Eager рекомендуется только для целей отладки и не поддерживается для tf.distribute.TPUStrategy . Хотя основное внимание в этом руководстве уделяется обучению, этот API также можно использовать для распределения оценок и прогнозов на разных платформах.

Вы можете использовать tf.distribute.Strategy с очень небольшими изменениями в своем коде, потому что базовые компоненты TensorFlow были изменены, чтобы они учитывали стратегию. Сюда входят переменные, слои, модели, оптимизаторы, метрики, сводки и контрольные точки.

В этом руководстве вы узнаете о различных типах стратегий и о том, как их можно использовать в разных ситуациях. Чтобы узнать, как устранять проблемы с производительностью, ознакомьтесь с руководством по оптимизации производительности графического процессора TensorFlow .

Настроить TensorFlow.

import tensorflow as tf

Типы стратегий

tf.distribute.Strategy намеревается охватить ряд вариантов использования по разным осям. Некоторые из этих комбинаций поддерживаются в настоящее время, а другие будут добавлены в будущем. Некоторые из этих осей:

  • Синхронное и асинхронное обучение: это два распространенных способа распределения обучения с параллелизмом данных. При синхронном обучении все рабочие тренируются на разных срезах входных данных синхронно и агрегируют градиенты на каждом этапе. При асинхронном обучении все рабочие независимо учатся на входных данных и асинхронно обновляют переменные. Обычно обучение синхронизации поддерживается с помощью all-reduce и асинхронности через архитектуру сервера параметров.
  • Аппаратная платформа: вы можете масштабировать свое обучение на нескольких графических процессорах на одном компьютере или на нескольких компьютерах в сети (с 0 или более графическими процессорами на каждом) или на облачных TPU.

Для поддержки этих вариантов использования в TensorFlow есть MirroredStrategy , TPUStrategy , MultiWorkerMirroredStrategy , ParameterServerStrategy , CentralStorageStrategy , а также другие доступные стратегии. В следующем разделе объясняется, какие из них поддерживаются в каких сценариях в TensorFlow. Вот краткий обзор:

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy CentralStorageStrategy ParameterServerStrategy
Керас Model.fit Поддерживается Поддерживается Поддерживается Экспериментальная поддержка Экспериментальная поддержка
Пользовательский цикл обучения Поддерживается Поддерживается Поддерживается Экспериментальная поддержка Экспериментальная поддержка
API оценки Ограниченная поддержка Не поддерживается Ограниченная поддержка Ограниченная поддержка Ограниченная поддержка

MirroredСтратегии

tf.distribute.MirroredStrategy поддерживает синхронное распределенное обучение на нескольких графических процессорах на одной машине. Он создает одну реплику на устройство GPU. Каждая переменная в модели зеркально отражается во всех репликах. Вместе эти переменные образуют единую концептуальную переменную с именем MirroredVariable . Эти переменные синхронизируются друг с другом путем применения идентичных обновлений.

Эффективные алгоритмы all-reduce используются для передачи обновлений переменных между устройствами. All-reduce объединяет тензоры на всех устройствах путем их сложения и делает их доступными на каждом устройстве. Это объединенный алгоритм, который очень эффективен и может значительно сократить накладные расходы на синхронизацию. Существует множество доступных алгоритмов и реализаций all-reduce, в зависимости от типа связи, доступной между устройствами. По умолчанию он использует Коллективную коммуникационную библиотеку NVIDIA ( NCCL ) в качестве реализации all-reduce. Вы можете выбрать из нескольких других вариантов или написать свой собственный.

Вот самый простой способ создания MirroredStrategy :

mirrored_strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Это создаст экземпляр MirroredStrategy , который будет использовать все графические процессоры, видимые для TensorFlow, и NCCL — для связи между устройствами.

Если вы хотите использовать на своем компьютере только некоторые графические процессоры, вы можете сделать это следующим образом:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])
WARNING:tensorflow:Some requested devices in `tf.distribute.Strategy` are not visible to TensorFlow: /job:localhost/replica:0/task:0/device:GPU:1,/job:localhost/replica:0/task:0/device:GPU:0
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1')

Если вы хотите переопределить взаимодействие между устройствами, вы можете сделать это с помощью аргумента cross_device_ops , предоставив экземпляр tf.distribute.CrossDeviceOps . В настоящее время tf.distribute.HierarchicalCopyAllReduce и tf.distribute.ReductionToOneDevice — это два параметра, отличные от tf.distribute.NcclAllReduce , который используется по умолчанию.

mirrored_strategy = tf.distribute.MirroredStrategy(
    cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

TPСтратегии

tf.distribute.TPUStrategy позволяет проводить обучение TensorFlow на Tensor Processing Units (TPU) . TPU — это специализированные ASIC от Google, предназначенные для значительного ускорения рабочих нагрузок машинного обучения. Они доступны в Google Colab , TPU Research Cloud и Cloud TPU .

С точки зрения архитектуры распределенного обучения TPUStrategy — это тот же MirroredStrategy — он реализует синхронное распределенное обучение. TPU обеспечивают собственную реализацию эффективного all-reduce и других коллективных операций на нескольких ядрах TPU, которые используются в TPUStrategy .

Вот как вы могли бы создать экземпляр TPUStrategy :

cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
    tpu=tpu_address)
tf.config.experimental_connect_to_cluster(cluster_resolver)
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
tpu_strategy = tf.distribute.TPUStrategy(cluster_resolver)

Экземпляр TPUClusterResolver помогает найти TPU. В Colab вам не нужно указывать какие-либо аргументы.

Если вы хотите использовать это для облачных TPU:

  • Вы должны указать имя вашего ресурса TPU в аргументе tpu .
  • Вы должны явно инициализировать систему TPU в начале программы. Это необходимо, прежде чем TPU можно будет использовать для вычислений. Инициализация системы TPU также стирает память TPU, поэтому важно сначала выполнить этот шаг, чтобы избежать потери состояния.

MultiWorkerMirroredСтратегии

tf.distribute.MultiWorkerMirroredStrategy очень похож на MirroredStrategy . Он реализует синхронное распределенное обучение для нескольких рабочих, каждый из которых может иметь несколько графических процессоров. Подобно tf.distribute.MirroredStrategy , он создает копии всех переменных в модели на каждом устройстве для всех рабочих процессов.

Вот самый простой способ создания MultiWorkerMirroredStrategy :

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy имеет две реализации для обмена данными между устройствами. CommunicationImplementation.RING основан на RPC и поддерживает как ЦП, так и ГП. CommunicationImplementation.NCCL использует NCCL и обеспечивает современную производительность на графических процессорах, но не поддерживает ЦП. CollectiveCommunication.AUTO передает выбор Tensorflow. Вы можете указать их следующим образом:

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
strategy = tf.distribute.MultiWorkerMirroredStrategy(
    communication_options=communication_options)
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.NCCL

Одним из ключевых отличий обучения с несколькими работниками по сравнению с обучением с несколькими графическими процессорами является настройка с несколькими работниками. Переменная среды 'TF_CONFIG' — это стандартный способ в TensorFlow указать конфигурацию кластера для каждого рабочего, который является частью кластера. Узнайте больше в разделе настройки TF_CONFIG этого документа.

Дополнительные сведения о MultiWorkerMirroredStrategy в следующих руководствах:

ПараметрСерверСтратегия

Обучение сервера параметров — это распространенный метод параллельного анализа данных для масштабирования обучения модели на нескольких машинах. Учебный кластер сервера параметров состоит из рабочих процессов и серверов параметров. Переменные создаются на серверах параметров, и они считываются и обновляются работниками на каждом этапе. Подробности см. в учебном пособии по серверу параметров .

В TensorFlow 2 для обучения сервера параметров используется архитектура на основе центрального координатора через класс tf.distribute.experimental.coordinator.ClusterCoordinator .

В этой реализации worker задачи и задачи parameter server запускают tf.distribute.Server , которые прослушивают задачи от координатора. Координатор создает ресурсы, распределяет обучающие задачи, записывает контрольные точки и занимается ошибками задач.

При программировании координатора вы будете использовать объект ParameterServerStrategy для определения шага обучения и использовать ClusterCoordinator для отправки шагов обучения удаленным работникам. Вот самый простой способ их создания:

strategy = tf.distribute.experimental.ParameterServerStrategy(
    tf.distribute.cluster_resolver.TFConfigClusterResolver(),
    variable_partitioner=variable_partitioner)
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(
    strategy)

Чтобы узнать больше о ParameterServerStrategy , ознакомьтесь с обучением сервера параметров с помощью Keras Model.fit и учебным пособием по пользовательскому циклу обучения.

В TensorFlow 1 ParameterServerStrategy доступен только с оценщиком через символ tf.compat.v1.distribute.experimental.ParameterServerStrategy .

CentralStorageСтратегии

tf.distribute.experimental.CentralStorageStrategy выполняет синхронное обучение. Переменные не зеркалируются, вместо этого они размещаются на ЦП, а операции реплицируются на все локальные графические процессоры. Если есть только один графический процессор, все переменные и операции будут размещены на этом графическом процессоре.

Создайте экземпляр CentralStorageStrategy , выполнив следующие действия:

central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:localhost/replica:0/task:0/device:GPU:0'], variable_device = '/job:localhost/replica:0/task:0/device:GPU:0'

Это создаст экземпляр CentralStorageStrategy , который будет использовать все видимые графические и центральные процессоры. Обновление переменных в репликах будет агрегироваться перед применением к переменным.

Другие стратегии

В дополнение к вышеуказанным стратегиям есть еще две стратегии, которые могут быть полезны для прототипирования и отладки при использовании API tf.distribute .

Стратегия по умолчанию

Стратегия по умолчанию — это стратегия распространения, которая присутствует, когда в область действия не входит явная стратегия распространения. Он реализует интерфейс tf.distribute.Strategy , но является сквозным и не обеспечивает фактического распределения. Например, Strategy.run(fn) просто вызовет fn . Код, написанный с использованием этой стратегии, должен вести себя точно так же, как код, написанный без какой-либо стратегии. Вы можете думать об этом как о стратегии «без операции».

Стратегия по умолчанию — это синглтон, и нельзя создать больше ее экземпляров. Его можно получить с помощью tf.distribute.get_strategy за пределами области действия любой явной стратегии (тот же API, который можно использовать для получения текущей стратегии внутри области действия явной стратегии).

default_strategy = tf.distribute.get_strategy()

Эта стратегия служит двум основным целям:

  • Это позволяет безоговорочно писать код библиотеки с учетом распространения. Например, в tf.optimizer вы можете использовать tf.distribute.get_strategy и использовать эту стратегию для уменьшения градиентов — он всегда будет возвращать объект стратегии, для которого вы можете вызвать API Strategy.reduce .
# In optimizer or other library code
# Get currently active strategy
strategy = tf.distribute.get_strategy()
strategy.reduce("SUM", 1., axis=None)  # reduce some values
1.0
  • Подобно библиотечному коду, его можно использовать для написания программ конечных пользователей для работы со стратегией распространения и без нее, не требуя условной логики. Вот пример фрагмента кода, иллюстрирующий это:
if tf.config.list_physical_devices('GPU'):
  strategy = tf.distribute.MirroredStrategy()
else:  # Use the Default Strategy
  strategy = tf.distribute.get_strategy()

with strategy.scope():
  # Do something interesting
  print(tf.Variable(1.))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
MirroredVariable:{
  0: <tf.Variable 'Variable:0' shape=() dtype=float32, numpy=1.0>
}

OneDeviceСтратегия

tf.distribute.OneDeviceStrategy — это стратегия размещения всех переменных и вычислений на одном указанном устройстве.

strategy = tf.distribute.OneDeviceStrategy(device="/gpu:0")

Эта стратегия во многом отличается от стратегии по умолчанию. В стратегии по умолчанию логика размещения переменных остается неизменной по сравнению с запуском TensorFlow без какой-либо стратегии распределения. Но при использовании OneDeviceStrategy все переменные, созданные в его области видимости, явно размещаются на указанном устройстве. Более того, любые функции, вызываемые через OneDeviceStrategy.run , также будут размещены на указанном устройстве.

Ввод, распространяемый с помощью этой стратегии, будет предварительно загружаться на указанное устройство. В Стратегии по умолчанию нет распределения входных данных.

Подобно стратегии по умолчанию, эту стратегию также можно использовать для тестирования вашего кода перед переключением на другие стратегии, которые фактически распространяются на несколько устройств/машин. Это задействует механизм стратегии распределения несколько больше, чем Стратегия по умолчанию, но не в полной мере, используя, например, MirroredStrategy или TPUStrategy . Если вам нужен код, который ведет себя так, как будто стратегии нет, используйте стратегию по умолчанию.

До сих пор вы узнали о различных стратегиях и о том, как их можно реализовать. В следующих нескольких разделах показаны различные способы их использования для распространения обучения.

Используйте tf.distribute.Strategy с Keras Model.fit

tf.distribute.Strategy интегрирован в tf.keras , который является реализацией TensorFlow спецификации Keras API . tf.keras — это высокоуровневый API для построения и обучения моделей. Интегрируясь в tf.keras , вы можете легко распространять свое обучение, написанное в среде обучения Keras, с помощью Model.fit .

Вот что вам нужно изменить в вашем коде:

  1. Создайте экземпляр соответствующего tf.distribute.Strategy .
  2. Переместите создание модели Keras, оптимизатора и метрик внутрь strategy.scope .

Стратегии распространения TensorFlow поддерживают все типы моделей Keras — Sequential , Functional и subclassed .

Вот фрагмент кода для очень простой модели Keras с одним Dense слоем:

mirrored_strategy = tf.distribute.MirroredStrategy()

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])

model.compile(loss='mse', optimizer='sgd')
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).

В этом примере используется MirroredStrategy , поэтому вы можете запустить его на машине с несколькими графическими процессорами. strategy.scope() указывает Keras, какую стратегию использовать для распространения обучения. Создание моделей/оптимизаторов/метрик внутри этой области позволяет создавать распределенные переменные вместо обычных переменных. Как только это настроено, вы можете подогнать свою модель, как обычно. MirroredStrategy позаботится о воспроизведении обучения модели на доступных графических процессорах, агрегировании градиентов и многом другом.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(10)
model.fit(dataset, epochs=2)
model.evaluate(dataset)
Epoch 1/2
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
2021-10-26 01:27:56.527729: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
10/10 [==============================] - 3s 2ms/step - loss: 2.2552
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
INFO:tensorflow:Reduce to /job:localhost/replica:0/task:0/device:CPU:0 then broadcast to ('/job:localhost/replica:0/task:0/device:CPU:0',).
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.9968
2021-10-26 01:27:59.372113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 1s 2ms/step - loss: 0.6190
0.6190494298934937

Здесь tf.data.Dataset предоставляет данные для обучения и оценки. Вы также можете использовать массивы NumPy:

import numpy as np

inputs, targets = np.ones((100, 1)), np.ones((100, 1))
model.fit(inputs, targets, epochs=2, batch_size=10)
Epoch 1/2
2021-10-26 01:28:00.609977: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_9"
op: "FlatMapDataset"
input: "PrefetchDataset/_8"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_slice_batch_indices_997"
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 10
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.
10/10 [==============================] - 1s 2ms/step - loss: 0.4406
Epoch 2/2
10/10 [==============================] - 0s 2ms/step - loss: 0.1947
<keras.callbacks.History at 0x7fb81813d2d0>

В обоих случаях — с Dataset или NumPy — каждый пакет данного ввода делится поровну между несколькими репликами. Например, если вы используете MirroredStrategy с 2 графическими процессорами, каждый пакет размером 10 будет разделен между 2 графическими процессорами, при этом каждый получит 5 входных примеров на каждом этапе. Затем каждая эпоха будет обучаться быстрее по мере добавления новых графических процессоров. Как правило, вы захотите увеличить размер пакета по мере добавления дополнительных ускорителей, чтобы эффективно использовать дополнительную вычислительную мощность. Вам также потребуется перенастроить скорость обучения в зависимости от модели. Вы можете использовать strategy.num_replicas_in_sync чтобы получить количество реплик.

# Compute a global batch size using a number of replicas.
BATCH_SIZE_PER_REPLICA = 5
global_batch_size = (BATCH_SIZE_PER_REPLICA *
                     mirrored_strategy.num_replicas_in_sync)
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100)
dataset = dataset.batch(global_batch_size)

LEARNING_RATES_BY_BATCH_SIZE = {5: 0.1, 10: 0.15}
learning_rate = LEARNING_RATES_BY_BATCH_SIZE[global_batch_size]

Что поддерживается сейчас?

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Керас Model.fit Поддерживается Поддерживается Поддерживается Экспериментальная поддержка Экспериментальная поддержка

Примеры и руководства

Вот список руководств и примеров, иллюстрирующих приведенную выше сквозную интеграцию с Model.fit :

  1. Учебник : Обучение с Model.fit и MirroredStrategy .
  2. Учебное пособие : Обучение работе с Model.fit и MultiWorkerMirroredStrategy .
  3. Руководство : содержит пример использования Model.fit и TPUStrategy .
  4. Учебное пособие . Обучение сервера параметров с помощью Model.fit и ParameterServerStrategy .
  5. Учебное пособие : тонкая настройка BERT для многих задач из бенчмарка GLUE с помощью Model.fit и TPUStrategy .
  6. Репозиторий TensorFlow Model Garden, содержащий коллекции современных моделей, реализованных с использованием различных стратегий.

Используйте tf.distribute.Strategy с пользовательскими циклами обучения

Как показано выше, использование tf.distribute.Strategy с Model.fit требует изменения всего пары строк вашего кода. Приложив немного больше усилий, вы также можете использовать tf.distribute.Strategy с пользовательскими циклами обучения .

Если вам нужно больше гибкости и контроля над вашими циклами обучения, чем это возможно с Estimator или Keras, вы можете написать собственные циклы обучения. Например, при использовании GAN вы можете захотеть делать разное количество шагов генератора или дискриминатора в каждом раунде. Точно так же структуры высокого уровня не очень подходят для обучения с подкреплением.

Классы tf.distribute.Strategy предоставляют базовый набор методов для поддержки настраиваемых циклов обучения. Их использование может изначально потребовать незначительной реструктуризации кода, но как только это будет сделано, вы сможете переключаться между GPU, TPU и несколькими машинами, просто изменяя экземпляр стратегии.

Ниже приведен краткий фрагмент, иллюстрирующий этот вариант использования для простого обучающего примера с использованием той же модели Keras, что и раньше.

Сначала создайте модель и оптимизатор в рамках стратегии. Это гарантирует, что любые переменные, созданные с помощью модели и оптимизатора, являются зеркальными переменными.

with mirrored_strategy.scope():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
  optimizer = tf.keras.optimizers.SGD()

Затем создайте входной набор данных и вызовите tf.distribute.Strategy.experimental_distribute_dataset , чтобы распределить набор данных на основе стратегии.

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(
    global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
2021-10-26 01:28:01.831942: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:695] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

Затем определите один шаг обучения. Используйте tf.GradientTape для вычисления градиентов и оптимизатора для применения этих градиентов для обновления переменных вашей модели. Чтобы распространить этот обучающий шаг, поместите его в функцию train_step и передайте в tf.distribute.Strategy.run вместе с входными данными набора данных, которые вы получили из ранее созданного dist_dataset :

loss_object = tf.keras.losses.BinaryCrossentropy(
  from_logits=True,
  reduction=tf.keras.losses.Reduction.NONE)

def compute_loss(labels, predictions):
  per_example_loss = loss_object(labels, predictions)
  return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

def train_step(inputs):
  features, labels = inputs

  with tf.GradientTape() as tape:
    predictions = model(features, training=True)
    loss = compute_loss(labels, predictions)

  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))
  return loss

@tf.function
def distributed_train_step(dist_inputs):
  per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
  return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
                         axis=None)

Несколько других вещей, которые следует отметить в приведенном выше коде:

  1. Вы использовали tf.nn.compute_average_loss для вычисления потерь. tf.nn.compute_average_loss суммирует потери для каждого примера и делит сумму на global_batch_size . Это важно, потому что позже, после расчета градиентов для каждой реплики, они агрегируются по репликам путем их суммирования .
  2. Вы также использовали API tf.distribute.Strategy.reduce для агрегирования результатов, возвращаемых tf.distribute.Strategy.run . tf.distribute.Strategy.run возвращает результаты каждой локальной реплики в стратегии, и существует несколько способов использования этого результата. Вы можете reduce их, чтобы получить агрегированное значение. Вы также можете выполнить tf.distribute.Strategy.experimental_local_results , чтобы получить список значений, содержащихся в результате, по одному на локальную реплику.
  3. Когда вы вызываете apply_gradients в рамках стратегии распределения, его поведение изменяется. В частности, перед применением градиентов к каждому параллельному экземпляру во время синхронного обучения он выполняет суммирование всех реплик градиентов.

Наконец, после того как вы определили шаг обучения, вы можете dist_dataset и запустить обучение в цикле:

for dist_inputs in dist_dataset:
  print(distributed_train_step(dist_inputs))
tf.Tensor(0.18686396, shape=(), dtype=float32)
tf.Tensor(0.18628375, shape=(), dtype=float32)
tf.Tensor(0.18570684, shape=(), dtype=float32)
tf.Tensor(0.18513316, shape=(), dtype=float32)
tf.Tensor(0.1845627, shape=(), dtype=float32)
tf.Tensor(0.18399543, shape=(), dtype=float32)
tf.Tensor(0.18343134, shape=(), dtype=float32)
tf.Tensor(0.18287037, shape=(), dtype=float32)
tf.Tensor(0.18231256, shape=(), dtype=float32)
tf.Tensor(0.18175781, shape=(), dtype=float32)
tf.Tensor(0.18120615, shape=(), dtype=float32)
tf.Tensor(0.18065754, shape=(), dtype=float32)
tf.Tensor(0.18011193, shape=(), dtype=float32)
tf.Tensor(0.17956935, shape=(), dtype=float32)
tf.Tensor(0.17902976, shape=(), dtype=float32)
tf.Tensor(0.17849308, shape=(), dtype=float32)
tf.Tensor(0.17795937, shape=(), dtype=float32)
tf.Tensor(0.17742859, shape=(), dtype=float32)
tf.Tensor(0.17690066, shape=(), dtype=float32)
tf.Tensor(0.17637561, shape=(), dtype=float32)

В приведенном выше примере вы перебирали dist_dataset , чтобы ввести данные для обучения. Вам также предоставляется tf.distribute.Strategy.make_experimental_numpy_dataset для поддержки входных данных NumPy. Вы можете использовать этот API для создания набора данных перед вызовом tf.distribute.Strategy.experimental_distribute_dataset .

Другой способ перебора ваших данных — явное использование итераторов. Вы можете сделать это, если хотите выполнить заданное количество шагов, а не перебирать весь набор данных. Вышеупомянутая итерация теперь будет изменена, чтобы сначала создать итератор, а затем явно вызвать next для него, чтобы получить входные данные.

iterator = iter(dist_dataset)
for _ in range(10):
  print(distributed_train_step(next(iterator)))
tf.Tensor(0.17585339, shape=(), dtype=float32)
tf.Tensor(0.17533402, shape=(), dtype=float32)
tf.Tensor(0.17481743, shape=(), dtype=float32)
tf.Tensor(0.17430364, shape=(), dtype=float32)
tf.Tensor(0.17379259, shape=(), dtype=float32)
tf.Tensor(0.17328428, shape=(), dtype=float32)
tf.Tensor(0.17277871, shape=(), dtype=float32)
tf.Tensor(0.17227581, shape=(), dtype=float32)
tf.Tensor(0.17177561, shape=(), dtype=float32)
tf.Tensor(0.17127804, shape=(), dtype=float32)

Это охватывает простейший случай использования tf.distribute.Strategy API для распространения пользовательских циклов обучения.

Что поддерживается сейчас?

API обучения MirroredStrategy TPUStrategy MultiWorkerMirroredStrategy ParameterServerStrategy CentralStorageStrategy
Пользовательский цикл обучения Поддерживается Поддерживается Поддерживается Экспериментальная поддержка Экспериментальная поддержка

Примеры и руководства

Вот несколько примеров использования стратегий распределения с пользовательскими циклами обучения:

  1. Учебное пособие : обучение с пользовательским циклом обучения и MirroredStrategy .
  2. Учебное пособие : обучение с пользовательским циклом обучения и MultiWorkerMirroredStrategy .
  3. Руководство : Содержит пример пользовательского цикла обучения с помощью TPUStrategy .
  4. Учебное пособие . Обучение сервера параметров с пользовательским циклом обучения и ParameterServerStrategy .
  5. Репозиторий TensorFlow Model Garden, содержащий коллекции современных моделей, реализованных с использованием различных стратегий.

Другие темы

В этом разделе рассматриваются некоторые темы, относящиеся к нескольким вариантам использования.

Настройка переменной окружения TF_CONFIG

Для обучения нескольких сотрудников, как упоминалось ранее, вам необходимо настроить переменную среды 'TF_CONFIG' для каждого двоичного файла, работающего в вашем кластере. Переменная среды 'TF_CONFIG' представляет собой строку JSON, которая указывает, какие задачи составляют кластер, их адреса и роль каждой задачи в кластере. tensorflow/ecosystem предоставляет шаблон Kubernetes, который настраивает 'TF_CONFIG' для ваших учебных задач.

Есть два компонента 'TF_CONFIG' : кластер и задача.

  • Кластер предоставляет информацию об обучающем кластере, который представляет собой список, состоящий из различных типов заданий, таких как рабочие. При обучении с несколькими работниками обычно есть один работник, который берет на себя немного больше обязанностей, таких как сохранение контрольной точки и написание сводного файла для TensorBoard, в дополнение к тому, что делает обычный работник. Такой воркер называется «главным» воркером, и принято, что воркер с индексом 0 назначается главным воркером (собственно так и реализован tf.distribute.Strategy ).
  • С другой стороны, задача предоставляет информацию о текущей задаче. Кластер первого компонента одинаков для всех рабочих процессов, а задача второго компонента отличается для каждого рабочего процесса и указывает тип и индекс этого рабочего процесса.

Один из примеров 'TF_CONFIG' :

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"]
    },
   "task": {"type": "worker", "index": 1}
})

Этот 'TF_CONFIG' указывает, что в "cluster" есть три рабочих процесса и две задачи "ps" , а также их хосты и порты. Часть "task" определяет роль текущей задачи в "cluster" worker 1 (второй worker). Допустимыми ролями в кластере являются "chief" , "worker" , "ps" и "evaluator" . Не должно быть задания "ps" , за исключением случаев использования tf.distribute.experimental.ParameterServerStrategy .

Что дальше?

tf.distribute.Strategy активно развивается. Попробуйте и поделитесь своим мнением, используя вопросы GitHub .