Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
Обзор
В этом руководстве показано, как выполнять распределенное обучение с несколькими работниками с помощью модели Keras и API Model.fit
с использованием API tf.distribute.Strategy
, в частности класса tf.distribute.MultiWorkerMirroredStrategy
. С помощью этой стратегии модель Keras, разработанная для работы с одним рабочим, может беспрепятственно работать на нескольких рабочих с минимальными изменениями кода.
Для тех, кто заинтересован в более глубоком понимании API tf.distribute.Strategy
, доступно руководство « Распределенное обучение в TensorFlow » для обзора стратегий распространения, поддерживаемых TensorFlow.
Чтобы узнать, как использовать MultiWorkerMirroredStrategy
с Keras и настраиваемым циклом обучения, см. Пользовательский цикл обучения с Keras и MultiWorkerMirroredStrategy .
Обратите внимание, что целью этого руководства является демонстрация минимального примера с несколькими рабочими процессами с двумя рабочими процессами.
Настраивать
Начните с некоторых необходимых импортов:
import json
import os
import sys
Перед импортом TensorFlow внесите несколько изменений в среду:
- Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие процессы пытаются использовать один и тот же графический процессор. В реальном приложении каждый работник будет работать на отдельной машине.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- Сбросьте переменную окружения
TF_CONFIG
(подробнее об этом вы узнаете позже):
os.environ.pop('TF_CONFIG', None)
- Убедитесь, что текущий каталог находится на пути Python — это позволит блокноту позже импортировать файлы, записанные
%%writefile
:
if '.' not in sys.path:
sys.path.insert(0, '.')
Теперь импортируйте TensorFlow:
import tensorflow as tf
Набор данных и определение модели
Затем создайте файл mnist_setup.py
с простой настройкой модели и набора данных. Этот файл Python будет использоваться рабочими процессами в этом руководстве:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
Обучение модели на одном работнике
Попробуйте обучить модель на небольшом количестве эпох и понаблюдайте за результатами одного воркера , чтобы убедиться, что все работает правильно. По мере обучения потери должны снижаться, а точность увеличиваться.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
Многопользовательская конфигурация
Теперь давайте войдем в мир обучения нескольких сотрудников.
Кластер с заданиями и задачами
В TensorFlow распределенное обучение включает: 'cluster'
с несколькими заданиями, и каждое из заданий может иметь одну или несколько 'task'
.
Вам понадобится переменная среды конфигурации TF_CONFIG
для обучения на нескольких машинах, каждая из которых может иметь свою роль. TF_CONFIG
— это строка JSON, используемая для указания конфигурации кластера для каждого рабочего процесса, являющегося частью кластера.
Переменная TF_CONFIG
состоит из двух компонентов: 'cluster'
и 'task'
.
'cluster'
одинаков для всех работников и предоставляет информацию об обучающем кластере, который представляет собой список, состоящий из различных типов должностей, таких как'worker'
или'chief'
.- При обучении нескольких рабочих с
tf.distribute.MultiWorkerMirroredStrategy
обычно есть один'worker'
, который берет на себя обязанности, такие как сохранение контрольной точки и запись сводного файла для TensorBoard, в дополнение к тому, что делает обычный'worker'
. Такой'worker'
называется главным рабочим (с названием должности'chief'
). - Обычно
'chief'
назначается'index'
0
(собственно, так реализованtf.distribute.Strategy
).
- При обучении нескольких рабочих с
'task'
предоставляет информацию о текущей задаче и отличается для каждого работника. Он указывает'type'
и'index'
этого работника.
Ниже приведен пример конфигурации:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Вот тот же TF_CONFIG
, сериализованный как строка JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Обратите внимание, что tf_config
— это просто локальная переменная в Python. Чтобы иметь возможность использовать его для обучающей конфигурации, этот словарь необходимо сериализовать как JSON и поместить в переменную среды TF_CONFIG
.
В приведенном выше примере конфигурации вы устанавливаете 'type'
задачи на 'worker'
а 'index'
задачи — на 0
. Поэтому эта машина первая рабочая. Он будет назначен 'chief'
работником и будет выполнять больше работы, чем другие.
В целях иллюстрации в этом руководстве показано, как можно настроить переменную TF_CONFIG
с двумя рабочими процессами на localhost
хосте.
На практике вы должны создать несколько воркеров на внешних IP-адресах/портах и соответственно установить переменную TF_CONFIG
для каждого воркера.
В этом уроке вы будете использовать двух рабочих:
- TF_CONFIG первого («начального
'chief'
)TF_CONFIG
показан выше. - Для второго воркера вы установите
tf_config['task']['index']=1
Переменные среды и подпроцессы в блокнотах
Подпроцессы наследуют переменные среды от своего родителя.
Например, вы можете установить переменную среды в этом процессе Jupyter Notebook следующим образом:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Затем вы можете получить доступ к переменной среды из подпроцессов:
echo ${GREETINGS}
Hello TensorFlow!
В следующем разделе вы будете использовать аналогичный метод для передачи TF_CONFIG
рабочим подпроцессам. В реальном сценарии вы бы не запускали свои задания таким образом, но в этом примере этого достаточно.
Выберите правильную стратегию
В TensorFlow есть две основные формы распределенного обучения:
- Синхронное обучение , при котором этапы обучения синхронизируются между рабочими процессами и репликами, а также
- Асинхронное обучение , где этапы обучения строго не синхронизированы (например, обучение сервера параметров ).
В этом руководстве показано, как выполнять синхронное обучение нескольких сотрудников с использованием экземпляра tf.distribute.MultiWorkerMirroredStrategy
.
MultiWorkerMirroredStrategy
создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих процессов. Он использует CollectiveOps
, операцию TensorFlow для коллективной коммуникации, для объединения градиентов и синхронизации переменных. Руководство tf.distribute.Strategy
содержит более подробную информацию об этой стратегии.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
предоставляет несколько реализаций с помощью параметра tf.distribute.experimental.CommunicationOptions
: 1) RING
реализует коллективы на основе кольца, используя gRPC в качестве уровня связи между узлами; 2) NCCL
использует библиотеку коллективных коммуникаций NVIDIA для реализации коллективов; и 3) AUTO
откладывает выбор до среды выполнения. Лучший выбор коллективной реализации зависит от количества и типа графических процессоров, а также сетевого соединения в кластере.
Обучите модель
С интеграцией tf.distribute.Strategy
API в tf.keras
единственное изменение, которое вы сделаете для распространения обучения среди нескольких сотрудников, — это включение построения модели и model.compile()
внутри strategy.scope()
. Область действия стратегии распределения определяет, как и где создаются переменные, а в случае MultiWorkerMirroredStrategy
создаваемые переменные являются MirroredVariable
s, и они реплицируются на каждом из рабочих процессов.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
Чтобы на самом деле работать с MultiWorkerMirroredStrategy
, вам нужно запустить рабочие процессы и передать им TF_CONFIG
.
Как и файл mnist_setup.py
, написанный ранее, вот main.py
, который будет запускать каждый из рабочих процессов:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
В приведенном выше фрагменте кода обратите внимание, что global_batch_size
, который передается в Dataset.batch
, имеет значение per_worker_batch_size * num_workers
. Это гарантирует, что каждый рабочий процесс обрабатывает пакеты примеров per_worker_batch_size
независимо от количества рабочих процессов.
Текущий каталог теперь содержит оба файла Python:
ls *.py
main.py mnist_setup.py
Итак, json-сериализуйте TF_CONFIG
и добавьте его в переменные среды:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Теперь вы можете запустить рабочий процесс, который будет запускать main.py
и использовать TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Есть несколько замечаний по поводу приведенной выше команды:
- Он использует
%%bash
, который является «волшебством» ноутбука для запуска некоторых команд bash. - Он использует флаг
--bg
для запуска процессаbash
в фоновом режиме, потому что этот рабочий процесс не завершится. Он ждет всех рабочих перед тем, как начать.
Фоновый рабочий процесс не будет печатать выходные данные в этот блокнот, поэтому &>
перенаправляет свои выходные данные в файл, чтобы вы могли позже проверить, что произошло, в файле журнала.
Итак, подождите несколько секунд, пока процесс запустится:
import time
time.sleep(10)
Теперь проверьте, что было выведено в файл журнала рабочего процесса:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
В последней строке файла журнала должно быть написано: Started server with target: grpc://localhost:12345
. Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить работу.
Итак, обновите tf_config
для второго рабочего процесса:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Запустите второго рабочего. Это запустит обучение, так как все рабочие активны (поэтому нет необходимости фонировать этот процесс):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Если вы перепроверите журналы, написанные первым воркером, вы узнаете, что он участвовал в обучении этой модели:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
Неудивительно, что это работало медленнее , чем тестовый запуск в начале этого руководства.
Запуск нескольких воркеров на одной машине только увеличивает накладные расходы.
Цель здесь заключалась не в том, чтобы улучшить время обучения, а только в том, чтобы привести пример обучения нескольких работников.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Углубленное обучение нескольких сотрудников
Итак, вы узнали, как выполнить базовую настройку нескольких рабочих.
В оставшейся части руководства вы подробно узнаете о других факторах, которые могут быть полезны или важны для реальных случаев использования.
Разделение набора данных
При обучении нескольких сотрудников сегментирование набора данных необходимо для обеспечения конвергенции и производительности.
В примере из предыдущего раздела используется автоматическое разбиение по умолчанию, предоставляемое API tf.distribute.Strategy
. Вы можете управлять сегментированием, установив tf.data.experimental.AutoShardPolicy
в tf.data.experimental.DistributeOptions
.
Чтобы узнать больше об автошардинге , обратитесь к руководству по распределенному вводу .
Вот краткий пример того, как отключить автоматическое сегментирование, чтобы каждая реплика обрабатывала каждый пример ( не рекомендуется ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
Оценка
Если вы передадите validation_data
в Model.fit
, он будет чередоваться между обучением и оценкой для каждой эпохи. Оценка, использующая validation_data
, распределяется между одним и тем же набором рабочих, а результаты оценки объединяются и доступны для всех рабочих.
Как и в случае с обучением, набор проверочных данных автоматически сегментируется на уровне файла. Вам нужно установить глобальный размер пакета в наборе данных проверки и установить validation_steps
.
Повторный набор данных также рекомендуется для оценки.
Кроме того, вы также можете создать другую задачу, которая периодически считывает контрольные точки и запускает оценку. Это то, что делает оценщик. Но это не рекомендуемый способ выполнения оценки, поэтому его детали опущены.
Представление
Теперь у вас есть модель Keras, настроенная для запуска в нескольких рабочих процессах с помощью MultiWorkerMirroredStrategy
.
Чтобы настроить производительность обучения нескольких сотрудников, вы можете попробовать следующее:
tf.distribute.MultiWorkerMirroredStrategy
предоставляет несколько реализаций коллективной коммуникации :-
RING
реализует коллективы на основе кольца, используя gRPC в качестве уровня связи между узлами. -
NCCL
использует библиотеку коллективных коммуникаций NVIDIA для реализации коллективов. -
AUTO
откладывает выбор до среды выполнения.
Лучший выбор коллективной реализации зависит от количества графических процессоров, типа графических процессоров и сетевого соединения в кластере. Чтобы переопределить автоматический выбор, укажите параметр
communication_options
конструктораMultiWorkerMirroredStrategy
. Например:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
Приведите переменные к
tf.float
, если это возможно:- Официальная модель ResNet включает пример того, как это можно сделать.
Отказоустойчивость
При синхронном обучении кластер выйдет из строя, если один из рабочих выйдет из строя, а механизма восстановления после сбоя не существует.
Использование Keras с tf.distribute.Strategy
преимущество отказоустойчивости в случаях, когда рабочие умирают или иным образом нестабильны. Это можно сделать, сохранив состояние обучения в распределенной файловой системе по вашему выбору, чтобы после перезапуска экземпляра, который ранее дал сбой или был вытеснен, состояние обучения восстанавливалось.
Когда рабочий становится недоступным, другие рабочие перестают работать (возможно, после тайм-аута). В таких случаях необходимо перезапустить недоступный воркер, а также другие воркеры, вышедшие из строя.
Обратный вызов ModelCheckpoint
Обратный вызов ModelCheckpoint
больше не обеспечивает отказоустойчивость. Вместо этого используйте обратный вызов BackupAndRestore
.
Обратный вызов ModelCheckpoint
по-прежнему можно использовать для сохранения контрольных точек. Но при этом, если обучение было прервано или успешно завершено, для продолжения обучения с контрольной точки пользователь должен загрузить модель вручную.
При желании пользователь может сохранить и восстановить модель/веса вне обратного вызова ModelCheckpoint
.
Сохранение и загрузка модели
Чтобы сохранить вашу модель с помощью model.save
или tf.saved_model.save
, место сохранения должно быть разным для каждого работника.
- Для неглавных работников вам нужно будет сохранить модель во временный каталог.
- Для начальника вам нужно будет сохранить в указанный каталог модели.
Временные каталоги рабочего процесса должны быть уникальными, чтобы предотвратить ошибки, возникающие из-за того, что несколько рабочих процессов пытаются выполнить запись в одно и то же место.
Модель, сохраненная во всех каталогах, идентична, и обычно для восстановления или обслуживания следует ссылаться только на модель, сохраненную руководителем.
У вас должна быть некоторая логика очистки, которая удаляет временные каталоги, созданные рабочими процессами после завершения обучения.
Причина экономии на начальнике и рабочих одновременно заключается в том, что вы можете агрегировать переменные во время контрольной точки, что требует участия начальника и рабочих в протоколе связи allreduce. С другой стороны, разрешение руководителю и рабочим сохранять файлы в одном и том же каталоге модели приведет к ошибкам из-за конкуренции.
Используя MultiWorkerMirroredStrategy
, программа запускается на каждом воркере, и чтобы узнать, является ли текущий воркер главным, она использует объект распознавателя кластера, который имеет атрибуты task_type
и task_id
:
-
task_type
сообщает вам, что такое текущая работа (например'worker'
). -
task_id
сообщает вам идентификатор работника. - Воркер с
task_id == 0
назначается главным воркером.
В приведенном ниже фрагменте кода функция write_filepath
предоставляет путь к файлу для записи, который зависит от task_id
:
- Для главного воркера (с
task_id == 0
) пишет в исходный путь к файлу. - Для других воркеров он создает временный каталог —
temp_dir
— сtask_id
в пути к каталогу для записи:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
Теперь вы готовы экономить:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
Как описано выше, в дальнейшем модель должна загружаться только из пути, в котором сохранен главный, поэтому давайте удалим временные, сохраненные неглавными работниками:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
Теперь, когда пришло время загрузки, воспользуемся удобным API tf.keras.models.load_model
и продолжим дальнейшую работу.
Здесь предположим, что для загрузки и продолжения обучения используется только один рабочий процесс, и в этом случае вы не вызываете tf.keras.models.load_model
в другом strategy.scope()
(обратите внимание, что strategy = tf.distribute.MultiWorkerMirroredStrategy()
, как определено ранее ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
Сохранение и восстановление контрольной точки
С другой стороны, контрольные точки позволяют сохранять веса вашей модели и восстанавливать их без сохранения всей модели.
Здесь вы создадите одну tf.train.Checkpoint
, которая отслеживает модель, которой управляет tf.train.CheckpointManager
, так что сохраняется только последняя контрольная точка:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
После настройки CheckpointManager
вы готовы сохранять и удалять контрольные точки, сохраненные неглавными работниками:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
Теперь, когда вам нужно восстановить модель, вы можете найти последнюю сохраненную контрольную точку с помощью удобной функции tf.train.latest_checkpoint
. После восстановления чекпоинта можно продолжить обучение.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
Обратный вызов BackupAndRestore
Обратный вызов tf.keras.callbacks.BackupAndRestore
обеспечивает функциональность отказоустойчивости путем резервного копирования модели и текущего номера эпохи во временном файле контрольной точки с аргументом backup_dir
для BackupAndRestore
. Это делается в конце каждой эпохи.
Как только задания прерываются и перезапускаются, обратный вызов восстанавливает последнюю контрольную точку, и обучение продолжается с начала прерванной эпохи. Любое частичное обучение, уже выполненное в незавершенную эпоху до прерывания, будет отброшено, чтобы оно не повлияло на конечное состояние модели.
Чтобы использовать его, предоставьте экземпляр tf.keras.callbacks.BackupAndRestore
при вызове Model.fit
.
С MultiWorkerMirroredStrategy
, если рабочий процесс прерывается, весь кластер приостанавливается до тех пор, пока прерванный рабочий процесс не будет перезапущен. Другие рабочие процессы также будут перезапущены, а прерванный рабочий процесс снова присоединится к кластеру. Затем каждый рабочий процесс считывает ранее сохраненный файл контрольной точки и восстанавливает свое прежнее состояние, тем самым позволяя кластеру снова синхронизироваться. Затем обучение продолжается.
Обратный вызов BackupAndRestore
использует CheckpointManager
для сохранения и восстановления состояния обучения, который создает файл с именем контрольная точка, который отслеживает существующие контрольные точки вместе с последней. По этой причине backup_dir
не следует повторно использовать для хранения других контрольных точек, чтобы избежать конфликта имен.
В настоящее время обратный вызов BackupAndRestore
поддерживает обучение одного работника без стратегии — MirroredStrategy
— и обучение нескольких работников с помощью MultiWorkerMirroredStrategy
.
Ниже приведены два примера как для обучения нескольких работников, так и для обучения одного работника:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
Если вы проверите каталог backup_dir
, который вы указали в BackupAndRestore
, вы можете заметить некоторые временно сгенерированные файлы контрольных точек. Эти файлы необходимы для восстановления ранее потерянных экземпляров, и они будут удалены библиотекой в конце Model.fit
после успешного завершения вашего обучения.
Дополнительные ресурсы
- Руководство по распределенному обучению в TensorFlow содержит обзор доступных стратегий распространения.
- В учебном пособии « Пользовательский цикл обучения с Keras и MultiWorkerMirroredStrategy» показано, как использовать
MultiWorkerMirroredStrategy
с Keras и пользовательским циклом обучения. - Ознакомьтесь с официальными моделями , многие из которых можно настроить для запуска нескольких стратегий распространения.
- Руководство Повышение производительности с помощью tf.function содержит информацию о других стратегиях и инструментах, таких как профилировщик TensorFlow , который вы можете использовать для оптимизации производительности своих моделей TensorFlow.