Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
Обзор
В этом руководстве демонстрируется обучение нескольких рабочих с помощью API пользовательского цикла обучения, распространяемого через MultiWorkerMirroredStrategy, поэтому модель Keras, предназначенная для работы с одним рабочим, может беспрепятственно работать на нескольких рабочих с минимальным изменением кода.
Мы используем настраиваемые циклы обучения для обучения нашей модели, потому что они дают нам гибкость и больший контроль над обучением. Кроме того, легче отладить модель и цикл обучения. Более подробная информация доступна в разделе Написание обучающего цикла с нуля .
Если вы ищете, как использовать MultiWorkerMirroredStrategy
с keras model.fit
, обратитесь к этому руководству .
Руководство по распределенному обучению TensorFlow доступно для обзора стратегий распространения, поддерживаемых TensorFlow, для тех, кто заинтересован в более глубоком понимании API tf.distribute.Strategy
.
Настраивать
Во-первых, необходимый импорт.
import json
import os
import sys
Перед импортом TensorFlow внесите несколько изменений в среду.
Отключите все графические процессоры. Это предотвращает ошибки, вызванные тем, что все рабочие процессы пытаются использовать один и тот же графический процессор. Для реального приложения каждый работник будет на другой машине.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Сбросьте переменную окружения TF_CONFIG
, об этом вы узнаете позже.
os.environ.pop('TF_CONFIG', None)
Убедитесь, что текущий каталог находится на пути Python. Это позволяет блокноту позже импортировать файлы, записанные с помощью %%writefile
.
if '.' not in sys.path:
sys.path.insert(0, '.')
Теперь импортируйте TensorFlow.
import tensorflow as tf
Набор данных и определение модели
Затем создайте файл mnist.py
с простой настройкой модели и набора данных. Этот файл python будет использоваться рабочими процессами в этом руководстве:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Многопользовательская конфигурация
Теперь давайте войдем в мир обучения нескольких сотрудников. В TF_CONFIG
переменная среды TF_CONFIG необходима для обучения на нескольких машинах, каждая из которых может иметь разную роль. TF_CONFIG
, используемый ниже, представляет собой строку JSON, используемую для указания конфигурации кластера для каждого рабочего процесса, который является частью кластера. Это метод по умолчанию для указания кластера с использованием cluster_resolver.TFConfigClusterResolver
, но в модуле distribute.cluster_resolver
доступны и другие параметры.
Опишите свой кластер
Вот пример конфигурации:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Вот тот же TF_CONFIG
, сериализованный как строка JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Есть два компонента TF_CONFIG
: cluster
и task
.
cluster
одинаков для всех рабочих и предоставляет информацию об обучающем кластере, который представляет собой набор, состоящий из различных типов заданий, таких какworker
. При обучении нескольких рабочих с помощьюMultiWorkerMirroredStrategy
обычно есть одинworker
, который берет на себя немного больше обязанностей, таких как сохранение контрольной точки и написание сводного файла для TensorBoard, в дополнение к тому, что делает обычныйworker
. Такой воркер называетсяchief
воркером, и принято, чтоworker
сindex
0 назначается главнымworker
(собственно так и реализованtf.distribute.Strategy
).task
предоставляет информацию о текущей задаче и отличается для каждого работника. Он указываетtype
иindex
этого работника.
В этом примере вы устанавливаете type
задачи "worker"
, а index
задачи — 0
. Эта машина является первым рабочим и будет назначена главным рабочим и будет выполнять больше работы, чем другие. Обратите внимание, что на других машинах также должна быть установлена переменная среды TF_CONFIG
, и она должна иметь тот же cluster
словарь, но другой type
задачи или index
задачи в зависимости от ролей этих машин.
В целях иллюстрации в этом руководстве показано, как можно установить TF_CONFIG
с двумя рабочими процессами на localhost
хосте. На практике пользователи должны создавать несколько рабочих процессов на внешних IP-адресах/портах и соответствующим образом устанавливать TF_CONFIG
для каждого рабочего процесса.
В этом примере вы будете использовать 2 воркера, TF_CONFIG первого TF_CONFIG
показан выше. Для второго рабочего вы должны установить tf_config['task']['index']=1
Выше tf_config
— это просто локальная переменная в python. Чтобы фактически использовать его для настройки обучения, этот словарь необходимо сериализовать как JSON и поместить в переменную среды TF_CONFIG
.
Переменные среды и подпроцессы в блокнотах
Подпроцессы наследуют переменные среды от своего родителя. Итак, если вы установите переменную среды в этом jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Вы можете получить доступ к переменной среды из подпроцессов:
echo ${GREETINGS}
Hello TensorFlow!
В следующем разделе вы будете использовать это для передачи TF_CONFIG
рабочим подпроцессам. На самом деле вы никогда не запустите свои задания таким образом, но этого достаточно для целей этого руководства: продемонстрировать минимальный пример с несколькими рабочими.
MultiWorkerMirroredСтратегии
Для обучения модели используйте экземпляр tf.distribute.MultiWorkerMirroredStrategy
, который создает копии всех переменных в слоях модели на каждом устройстве для всех рабочих процессов. Руководство tf.distribute.Strategy
содержит более подробную информацию об этой стратегии.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Используйте tf.distribute.Strategy.scope
, чтобы указать, что стратегия должна использоваться при построении вашей модели. Это помещает вас в « контекст между репликами » для этой стратегии, что означает, что стратегия получает контроль над такими вещами, как размещение переменных.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Автоматическое разделение ваших данных между работниками
При обучении нескольких рабочих сегментирование набора данных не обязательно требуется, однако оно дает вам семантику ровно один раз, что делает дальнейшее обучение более воспроизводимым, т. е. обучение на нескольких рабочих должно быть таким же, как обучение на одном рабочем. Примечание. В некоторых случаях производительность может быть снижена.
См.: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Определите пользовательский цикл обучения и обучите модель
Укажите оптимизатор
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Определите шаг обучения с помощью tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Сохранение и восстановление контрольной точки
Реализация контрольных точек в пользовательском цикле обучения требует, чтобы пользователь обрабатывал ее вместо использования обратного вызова keras. Это позволяет вам сохранять веса модели и восстанавливать их, не сохраняя всю модель.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Здесь вы создадите одну tf.train.Checkpoint
, которая отслеживает модель, которой управляет tf.train.CheckpointManager
, так что сохраняется только последняя контрольная точка.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Теперь, когда вам нужно восстановить, вы можете найти последнюю сохраненную контрольную точку с помощью удобной функции tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
После восстановления контрольной точки вы можете продолжить обучение индивидуальному циклу обучения.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Полная настройка кода на воркерах
Чтобы на самом деле работать с MultiWorkerMirroredStrategy
, вам нужно запустить рабочие процессы и передать им TF_CONFIG
.
Как и файл mnist.py
, написанный ранее, вот main.py
, который содержит тот же код, который мы шаг за шагом проходили ранее в этом совместном проекте, мы просто записываем его в файл, чтобы каждый из рабочих выполнял его:
Файл: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Тренируйтесь и оценивайте
Текущий каталог теперь содержит оба файла Python:
ls *.py
main.py mnist.py
Итак, json-сериализуйте TF_CONFIG
и добавьте его в переменные среды:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Теперь вы можете запустить рабочий процесс, который будет запускать main.py
и использовать TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Есть несколько замечаний по поводу приведенной выше команды:
- Он использует
%%bash
, который является «волшебством» ноутбука для запуска некоторых команд bash. - Он использует флаг
--bg
для запуска процессаbash
в фоновом режиме, потому что этот рабочий процесс не завершится. Он ждет всех рабочих перед тем, как начать.
Фоновый рабочий процесс не будет печатать вывод в этот блокнот, поэтому &>
перенаправляет вывод в файл, чтобы вы могли видеть, что произошло.
Итак, подождите несколько секунд, пока процесс запустится:
import time
time.sleep(20)
Теперь посмотрите, что было выведено в файл журнала рабочего процесса:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
В последней строке файла журнала должно быть написано: Started server with target: grpc://localhost:12345
. Первый рабочий теперь готов и ждет, пока все остальные рабочие будут готовы продолжить работу.
Итак, обновите tf_config
для второго рабочего процесса:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Теперь запустите второго рабочего. Это запустит обучение, так как все рабочие активны (поэтому нет необходимости фонировать этот процесс):
python main.py > /dev/null 2>&1
Теперь, если вы перепроверите журналы, написанные первым рабочим, вы увидите, что он участвовал в обучении этой модели:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Углубленное обучение нескольких сотрудников
В этом руководстве продемонстрирован рабочий процесс Custom Training Loop
для настройки с несколькими работниками. Подробное описание других тем доступно в руководстве model.fit's guide
настройке нескольких рабочих и применимо к CTL.
Смотрите также
- В руководстве по распределенному обучению TensorFlow представлен обзор доступных стратегий распространения.
- Официальные модели , многие из которых можно настроить для запуска нескольких стратегий распространения.
- Раздел « Производительность» в руководстве содержит информацию о других стратегиях и инструментах , которые вы можете использовать для оптимизации производительности ваших моделей TensorFlow.