Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
Обзор
Обучение сервера параметров — это распространенный метод параллельного анализа данных для масштабирования обучения модели на нескольких машинах.
Учебный кластер сервера параметров состоит из рабочих процессов и серверов параметров . Переменные создаются на серверах параметров, и они считываются и обновляются работниками на каждом этапе. По умолчанию рабочие процессы читают и обновляют эти переменные независимо друг от друга, не синхронизируясь друг с другом. Вот почему иногда обучение параметров в стиле сервера называют асинхронным обучением .
В TensorFlow 2 обучение сервера параметров обеспечивается классом tf.distribute.experimental.ParameterServerStrategy
, который распределяет шаги обучения по кластеру, который масштабируется до тысяч рабочих (в сопровождении серверов параметров).
Поддерживаемые методы обучения
Существует два основных поддерживаемых метода обучения:
-
Model.fit
API, который рекомендуется, если вы предпочитаете высокоуровневую абстракцию и обработку обучения. - Пользовательский цикл обучения (дополнительные сведения см. в разделе Пользовательское обучение , Написание цикла обучения с нуля и Пользовательский цикл обучения с помощью Keras и MultiWorkerMirroredStrategy .) Обучение пользовательскому циклу рекомендуется, если вы предпочитаете определять детали своего цикла обучения.
Кластер с заданиями и задачами
Вне зависимости от выбранного API ( Model.fit
или настраиваемый цикл обучения), распределенное обучение в TensorFlow 2 предполагает: 'cluster'
с несколькими 'jobs'
, причем у каждого из заданий может быть одна или несколько 'tasks'
.
При использовании обучения сервера параметров рекомендуется иметь:
- Одна должность координатора (имеющая название должности
chief
) - Несколько рабочих мест (название рабочего места
worker
); и - Несколько заданий сервера параметров (имя задания
ps
)
В то время как координатор создает ресурсы, отправляет обучающие задачи, записывает контрольные точки и обрабатывает сбои задач, рабочие процессы и серверы параметров запускают tf.distribute.Server
, который прослушивает запросы от координатора.
Обучение сервера параметров с помощью Model.fit
API
Для обучения сервера параметров с помощью API Model.fit
координатор должен использовать объект tf.distribute.experimental.ParameterServerStrategy
и tf.keras.utils.experimental.DatasetCreator
в качестве входных данных. Подобно использованию Model.fit
без стратегии или с другими стратегиями, рабочий процесс включает создание и компиляцию модели, подготовку обратных вызовов, за которыми следует вызов Model.fit
.
Обучение сервера параметров с пользовательским циклом обучения
В настраиваемых циклах обучения класс tf.distribute.experimental.coordinator.ClusterCoordinator
является ключевым компонентом, используемым для координатора.
- Класс
ClusterCoordinator
должен работать в сочетании с объектомtf.distribute.Strategy
. - Этот объект
tf.distribute.Strategy
необходим для предоставления информации о кластере и используется для определения шага обучения, как показано в пользовательском обучении с помощью tf.distribute.Strategy . - Затем объект
ClusterCoordinator
отправляет выполнение этих шагов обучения удаленным работникам. - Для обучения сервера параметров
ClusterCoordinator
должен работать сtf.distribute.experimental.ParameterServerStrategy
.
Самый важный API, предоставляемый объектом ClusterCoordinator
, — это schedule
:
- API
schedule
ставит в очередь функциюtf.function
и немедленно возвращаетRemoteValue
значение будущего. - Функции в очереди будут отправлены удаленным работникам в фоновых потоках, а их
RemoteValue
будут заполнены асинхронно. - Поскольку
schedule
не требует назначения рабочего процесса, переданнаяtf.function
может быть выполнена на любом доступном рабочем процессе. - Если рабочий процесс, на котором он выполняется, становится недоступным до его завершения, функция будет повторно запущена на другом доступном рабочем потоке.
- Из-за этого факта и того факта, что выполнение функции не является атомарным, функция может выполняться более одного раза.
В дополнение к диспетчеризации удаленных функций ClusterCoordinator
также помогает создавать наборы данных для всех рабочих процессов и перестраивать эти наборы данных, когда рабочий процесс восстанавливается после сбоя.
Настройка учебника
Учебное пособие будет разветвлено на Model.fit
и пользовательские пути цикла обучения, и вы сможете выбрать тот, который соответствует вашим потребностям. Разделы, отличные от «Обучение с X», применимы к обоим путям.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Настройка кластера
Как упоминалось выше, для обучающего кластера сервера параметров требуется задача-координатор, которая запускает вашу обучающую программу, одна или несколько рабочих задач и задач сервера параметров, которые запускают серверы TensorFlow — tf.distribute.Server
— и, возможно, дополнительная задача оценки, которая запускает дополнительную оценку. (см. раздел оценки коляски ниже). К их установке предъявляются следующие требования:
- Задача координатора должна знать адреса и порты всех остальных серверов TensorFlow, кроме оценщика.
- Рабочие процессы и серверы параметров должны знать, какой порт им нужно прослушивать. Для простоты вы обычно можете передавать полную информацию о кластере при создании серверов TensorFlow для этих задач.
- Задача оценщика не должна знать настройку обучающего кластера. Если это так, он не должен пытаться подключиться к обучающему кластеру.
- Рабочие процессы и серверы параметров должны иметь типы задач
"worker"
и"ps"
соответственно. Координатор должен использовать"chief"
в качестве типа задачи по устаревшим причинам.
В этом руководстве вы создадите внутрипроцессный кластер, чтобы все обучение сервера параметров можно было запустить в Colab. В следующем разделе вы узнаете, как настроить настоящие кластеры .
Внутрипроцессный кластер
Вы начнете с создания нескольких серверов TensorFlow заранее и подключитесь к ним позже. Обратите внимание, что это только для целей демонстрации этого руководства, и в реальном обучении серверы будут запускаться на машинах "worker"
и "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
Внутрипроцессная настройка кластера часто используется при модульном тестировании, например здесь .
Другой вариант локального тестирования — запустить процессы на локальном компьютере — пример такого подхода см. в разделе Обучение нескольких рабочих с помощью Keras .
Создание экземпляра ParameterServerStrategy
Прежде чем погрузиться в обучающий код, давайте создадим экземпляр объекта ParameterServerStrategy
. Обратите внимание, что это необходимо независимо от того, используете ли вы Model.fit
или настраиваемый цикл обучения. Аргумент variable_partitioner
будет объяснен в разделе Шардинг переменных .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Чтобы использовать графические процессоры для обучения, выделите графические процессоры, видимые для каждого рабочего процесса. ParameterServerStrategy
будет использовать все доступные графические процессоры для каждого рабочего процесса с тем ограничением, что все рабочие процессы должны иметь одинаковое количество доступных графических процессоров.
Шардинг переменных
Разделение переменных означает разделение переменной на несколько меньших переменных, которые называются сегментами . Шардинг переменных может быть полезен для распределения сетевой нагрузки при доступе к этим сегментам. Также полезно распределить вычисление и хранение нормальной переменной между несколькими серверами параметров.
Чтобы включить сегментирование переменных, вы можете передать variable_partitioner
при создании объекта ParameterServerStrategy
. variable_partitioner
будет вызываться каждый раз, когда создается переменная, и ожидается, что он вернет количество осколков по каждому измерению переменной. Предоставляются некоторые готовые variable_partitioner
, такие как tf.distribute.experimental.partitioners.MinSizePartitioner
. Рекомендуется использовать разделители на основе размера, такие как tf.distribute.experimental.partitioners.MinSizePartitioner
, чтобы избежать разделения небольших переменных, что может негативно сказаться на скорости обучения модели.
Когда variable_partitioner
передается и если вы создаете переменную непосредственно в strategy.scope()
, она станет типом контейнера со свойством variables
, которое обеспечивает доступ к списку осколков. В большинстве случаев этот контейнер будет автоматически преобразован в тензор путем объединения всех осколков. В результате его можно использовать как обычную переменную. С другой стороны, некоторые методы TensorFlow, такие как tf.nn.embedding_lookup
, обеспечивают эффективную реализацию для этого типа контейнера, и в этих методах можно избежать автоматической конкатенации.
Дополнительные сведения см. в документации API tf.distribute.experimental.ParameterServerStrategy
.
Тренировка с Model.fit
Keras предоставляет простой в использовании API для обучения через Model.fit
, который обрабатывает цикл обучения под капотом, с гибкостью переопределяемого train_step
и обратными вызовами, которые предоставляют такие функции, как сохранение контрольных точек или сохранение сводки для TensorBoard. С Model.fit
тот же обучающий код можно использовать для других стратегий с простой заменой объекта стратегии.
Входные данные
Model.fit
с обучением сервера параметров требует, чтобы входные данные были предоставлены в вызываемом объекте, который принимает один аргумент типа tf.distribute.InputContext
и возвращает tf.data.Dataset
. Затем создайте объект tf.keras.utils.experimental.DatasetCreator
, который принимает такой callable
, и необязательный объект tf.distribute.InputOptions
через аргумент input_options
.
Обратите внимание, что рекомендуется перемешивать и повторять данные с параметрами обучения сервера, а также указывать steps_per_epoch
в вызове fit
, чтобы библиотека знала границы эпох.
Дополнительные сведения об аргументе InputContext
см. в руководстве по распределенному вводу .
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
Код в dataset_fn
будет вызываться на устройстве ввода, которым обычно является ЦП, на каждой из рабочих машин.
Построение и компиляция модели
Теперь вы создадите tf.keras.Model
— тривиальную модель tf.keras.models.Sequential
для демонстрационных целей — с последующим вызовом Model.compile
для включения компонентов, таких как оптимизатор, метрики или параметры, такие как steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Обратные звонки и обучение
Прежде чем вызывать model.fit
для фактического обучения, давайте подготовим необходимые обратные вызовы для общих задач, таких как:
-
ModelCheckpoint
: для сохранения веса модели. -
BackupAndRestore
: чтобы убедиться, что ход обучения автоматически резервируется и восстанавливается, если кластер становится недоступным (например, прерывание или вытеснение); или -
TensorBoard
: для сохранения отчетов о ходе выполнения в сводные файлы, которые визуализируются в инструменте TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Прямое использование с ClusterCoordinator
(необязательно)
Даже если вы выберете путь обучения Model.fit
, вы можете дополнительно создать экземпляр объекта tf.distribute.experimental.coordinator.ClusterCoordinator
, чтобы запланировать другие функции, которые вы хотели бы выполнять на рабочих процессах. Дополнительную информацию и примеры см. в разделе « Тренировка с пользовательским циклом обучения ».
Обучение с пользовательским циклом обучения
Использование настраиваемых циклов обучения с tf.distribute.Strategy
обеспечивает большую гибкость при определении циклов обучения. С ParameterServerStrategy
, определенным выше (как strategy
), вы будете использовать tf.distribute.experimental.coordinator.ClusterCoordinator
для отправки выполнения шагов обучения удаленным работникам.
Затем вы создадите модель, определите набор данных и ступенчатую функцию, как вы делали это в цикле обучения с другими tf.distribute.Strategy
s. Вы можете найти более подробную информацию в учебнике Custom training with tf.distribute.Strategy .
Чтобы обеспечить эффективную предварительную выборку наборов данных, используйте рекомендуемые API-интерфейсы для создания распределенных наборов данных, упомянутые в разделе « Этапы обучения отправки удаленным работникам » ниже. Кроме того, обязательно вызовите Strategy.run
внутри worker_fn
, чтобы в полной мере использовать графические процессоры, выделенные для рабочих процессов. Остальные шаги одинаковы для обучения с использованием графических процессоров или без них.
Давайте создадим эти компоненты, выполнив следующие шаги:
Настройте данные
Во-первых, напишите функцию, которая создает набор данных, включающий логику предварительной обработки, реализованную слоями предварительной обработки Keras .
Вы создадите эти слои вне dataset_fn
но примените преобразование внутри dataset_fn
, так как вы dataset_fn
в tf.function
, который не позволяет создавать переменные внутри него.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Создайте примеры игрушек в наборе данных:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Затем создайте обучающий набор данных, завернутый в dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Построить модель
Далее создайте модель и другие объекты. Обязательно создайте все переменные в strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Давайте подтвердим, что использование FixedShardsPartitioner
разделило все переменные на два сегмента, и каждый сегмент был назначен разным серверам параметров:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Определите этап обучения
В-третьих, создайте этап обучения, завернутый в tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
В приведенной выше функции шага обучения вызов Strategy.run
и Strategy.reduce
в step_fn
может поддерживать несколько графических процессоров на одного рабочего. Если рабочим выделены графические процессоры, Strategy.run
распределит наборы данных по нескольким репликам.
Отправка шагов обучения удаленным работникам
После того, как все вычисления определены ParameterServerStrategy
, вы будете использовать класс tf.distribute.experimental.coordinator.ClusterCoordinator
для создания ресурсов и распространения этапов обучения среди удаленных сотрудников.
Давайте сначала создадим объект ClusterCoordinator
и передадим объект стратегии:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Затем создайте набор данных для каждого работника и итератор. В приведенном ниже per_worker_dataset_fn
рекомендуется оборачивать dataset_fn
в strategy.distribute_datasets_from_function
, чтобы обеспечить беспрепятственную эффективную предварительную выборку на графические процессоры.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Последним шагом является распределение вычислений между удаленными работниками с помощью ClusterCoordinator.schedule
:
- Метод
schedule
ставит в очередь функциюtf.function
и немедленно возвращает удаленное значение, похожее наRemoteValue
. Функции в очереди будут отправляться удаленным работникам в фоновых потоках, аRemoteValue
будет заполняться асинхронно. - Метод
join
(ClusterCoordinator.join
) можно использовать для ожидания выполнения всех запланированных функций.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Вот как вы можете получить результат RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
Как вариант, можно запустить все шаги и что-то делать, ожидая завершения:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Полный рабочий процесс обучения и обслуживания для этого конкретного примера см. в этом тесте .
Подробнее о создании набора данных
Набор данных в приведенном выше коде создается с помощью ClusterCoordinator.create_per_worker_dataset
API). Он создает один набор данных для каждого работника и возвращает объект-контейнер. Вы можете вызвать для него метод iter
, чтобы создать итератор для каждого рабочего. Итератор для каждого рабочего процесса содержит один итератор для каждого рабочего процесса, и соответствующий фрагмент рабочего процесса будет заменен во входном аргументе функции, переданной методу ClusterCoordinator.schedule
, прежде чем функция будет выполнена на конкретном рабочем процессе.
В настоящее время метод ClusterCoordinator.schedule
предполагает, что рабочие процессы эквивалентны, и, следовательно, предполагает, что наборы данных для разных рабочих процессов одинаковы, за исключением того, что они могут перемешиваться по-разному, если они содержат операцию Dataset.shuffle
. Из-за этого также рекомендуется, чтобы наборы данных повторялись бесконечно, и вы планировали конечное количество шагов вместо того, чтобы полагаться на OutOfRangeError
из набора данных.
Еще одно важное замечание: наборы данных tf.data
не поддерживают неявную сериализацию и десериализацию за пределами задач. Поэтому важно создать весь набор данных внутри функции, переданной в ClusterCoordinator.create_per_worker_dataset
.
Оценка
Существует несколько способов определения и запуска цикла оценки в распределенном обучении. Каждый из них имеет свои плюсы и минусы, как описано ниже. Метод встроенной оценки рекомендуется, если у вас нет предпочтений.
Встроенная оценка
В этом методе координатор чередует обучение и оценку, поэтому он называется встроенной оценкой .
Есть несколько преимуществ встроенной оценки. Например:
- Он может поддерживать большие модели оценки и наборы данных оценки, которые не может удержать одна задача.
- Результаты оценки могут быть использованы для принятия решений по обучению следующей эпохи.
Существует два способа реализации встроенной оценки: прямая оценка и распределенная оценка.
- Прямая оценка : для небольших моделей и наборов данных для оценки координатор может запустить оценку непосредственно для распределенной модели с набором данных для оценки на координаторе:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Распределенная оценка . Для больших моделей или наборов данных, которые невозможно запустить непосредственно на координаторе, задача координатора может распределять задачи оценки среди рабочих с помощью методов
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Оценка коляски
Другой метод называется дополнительной оценкой , когда вы создаете специальную задачу оценщика, которая неоднократно считывает контрольные точки и запускает оценку последней контрольной точки. Это позволяет завершить программу обучения раньше, если вам не нужно менять цикл обучения на основе результатов оценки. Однако для запуска оценки требуется дополнительная задача оценщика и периодические контрольные точки. Ниже приведен возможный цикл оценки боковой машины:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Кластеры в реальном мире
В реальной производственной среде вы будете запускать все задачи в разных процессах на разных машинах. Самый простой способ настроить информацию о кластере для каждой задачи — установить переменные среды "TF_CONFIG"
и использовать tf.distribute.cluster_resolver.TFConfigClusterResolver
для разбора "TF_CONFIG"
.
Общее описание переменных среды "TF_CONFIG"
см. в учебном руководстве по распределенной среде.
Если вы начинаете свои обучающие задачи, используя Kubernetes или другие шаблоны конфигурации, весьма вероятно, что эти шаблоны уже установили для вас “TF_CONFIG"
.
Установите переменную среды "TF_CONFIG"
Предположим, у вас есть 3 воркера и 2 сервера параметров, "TF_CONFIG"
1 может быть:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
"TF_CONFIG"
оценщика может быть:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
Часть "cluster"
в приведенной выше "TF_CONFIG"
для оценщика является необязательной.
Если вы используете один и тот же бинарник для всех задач
Если вы предпочитаете выполнять все эти задачи, используя один двоичный файл, вам нужно будет разрешить вашей программе ветвление на разные роли в самом начале:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
Следующий код запускает сервер TensorFlow и ожидает:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Обработка сбоя задачи
Сбой работника
tf.distribute.experimental.coordinator.ClusterCoordinator
или Model.fit
обеспечивают встроенную отказоустойчивость при сбое рабочего процесса. После восстановления рабочего процесса ранее предоставленная функция набора данных (либо ClusterCoordinator.create_per_worker_dataset
для пользовательского цикла обучения, либо tf.keras.utils.experimental.DatasetCreator
для Model.fit
) будет вызываться для рабочих процессов для повторного создания наборов данных.
Сбой сервера параметров или координатора
Однако, когда координатор увидит ошибку сервера параметров, он немедленно вызовет ошибку UnavailableError
или AbortedError
. В этом случае вы можете перезапустить координатор. Сам координатор также может стать недоступным. Поэтому рекомендуется определенный инвентарь, чтобы не потерять тренировочный прогресс:
Для
Model.fit
следует использовать обратный вызовBackupAndRestore
, который автоматически обрабатывает сохранение и восстановление хода выполнения. Пример см. в разделе « Обратные вызовы и обучение » выше.Для пользовательского цикла обучения следует периодически проверять переменные модели и загружать переменные модели из контрольной точки, если таковая имеется, перед началом обучения. О прогрессе обучения можно приблизительно судить по
optimizer.iterations
, если оптимизатор имеет контрольную точку:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Получение RemoteValue
Получение RemoteValue
гарантированно будет успешным, если функция выполнена успешно. Это связано с тем, что в настоящее время возвращаемое значение сразу же копируется в координатор после выполнения функции. Если во время копирования произойдет сбой рабочего процесса, функция будет повторно применена к другому доступному рабочему процессу. Поэтому, если вы хотите оптимизировать производительность, вы можете планировать функции без возвращаемого значения.
Отчет об ошибках
Как только координатор увидит ошибку, такую как UnavailableError
от серверов параметров, или другие ошибки приложения, такие как InvalidArgument
от tf.debugging.check_numerics
, он отменит все ожидающие и поставленные в очередь функции, прежде чем вызвать ошибку. Извлечение их соответствующих RemoteValue
вызовет CancelledError
.
После возникновения ошибки координатор не будет вызывать ту же ошибку или какую-либо ошибку из отмененных функций.
Улучшение производительности
Существует несколько возможных причин возникновения проблем с производительностью при обучении с помощью ParameterServerStrategy
и ClusterResolver
.
Одной из распространенных причин является несбалансированная нагрузка на серверы параметров, а некоторые сильно загруженные серверы параметров исчерпали свои возможности. Также может быть несколько основных причин. Вот несколько простых способов смягчить эту проблему:
- Разделите большие переменные модели, указав
variable_partitioner
при построенииParameterServerStrategy
. - По возможности избегайте создания переменной точки доступа, которая требуется для всех серверов параметров, за один шаг. Например, используйте постоянную скорость обучения или подкласс
tf.keras.optimizers.schedules.LearningRateSchedule
в оптимизаторах, поскольку поведение по умолчанию заключается в том, что скорость обучения становится переменной, помещаемой на определенный сервер параметров и запрашиваемой всеми другими серверами параметров на каждом этапе. . - Перемешайте свои большие словари, прежде чем передавать их слоям предварительной обработки Keras.
Еще одна возможная причина проблем с производительностью — координатор. Ваша первая реализация schedule
/ join
основана на Python и, следовательно, может иметь накладные расходы на многопоточность. Также задержка между координатором и работниками может быть большой. Если это так,
Для
Model.fit
вы можете установить для аргументаsteps_per_execution
, предоставленного вModel.compile
, значение больше 1.Для пользовательского цикла обучения вы можете упаковать несколько шагов в одну
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Поскольку библиотека оптимизирована, мы надеемся, что большинству пользователей в будущем не придется вручную упаковывать шаги.
Кроме того, небольшая хитрость для повышения производительности заключается в том, чтобы запланировать функции без возвращаемого значения, как описано выше в разделе обработки сбоя задачи.
Известные ограничения
Большинство известных ограничений уже рассмотрены в предыдущих разделах. В этом разделе приводится сводка.
Общие ParameterServerStrategy
-
os.environment["grpc_fail_fast"]="use_caller"
необходим для каждой задачи, включая координатора, чтобы обеспечить правильную работу отказоустойчивости. - Синхронное обучение сервера параметров не поддерживается.
- Обычно необходимо упаковать несколько шагов в одну функцию для достижения оптимальной производительности.
- Не поддерживается загрузка сохраненной_модели через
tf.saved_model.load
, содержащую сегментированные переменные. Обратите внимание, что загрузка такой save_model с помощью TensorFlow Serving должна работать. - Не поддерживается загрузка контрольной точки, содержащей сегментированные переменные слота оптимизатора, в другое количество сегментов.
- Восстановление после сбоя сервера параметров без перезапуска задачи координатора не поддерживается.
- Использование
tf.lookup.StaticHashTable
(который обычно используется некоторыми уровнями предварительной обработки Keras, такими какtf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
иtf.keras.layers.TextVectorization
) приводит к размещению ресурсов на координатор в это время с сервером параметров обучения. Это влияет на производительность при поиске RPC от рабочих к координатору. В настоящее время это является приоритетной задачей.
Model.fit
-
steps_per_epoch
требуется аргументModel.fit
. Вы можете выбрать значение, которое обеспечивает соответствующие интервалы в эпохе. -
ParameterServerStrategy
не поддерживает пользовательские обратные вызовы, которые имеют вызовы на уровне пакетов по соображениям производительности. Вы должны преобразовать эти вызовы в вызовы уровня эпохи с соответствующим образом выбраннымsteps_per_epoch
, чтобы они вызывались через число шаговsteps_per_epoch
. Встроенные обратные вызовы не затронуты: их вызовы на уровне пакетов были изменены, чтобы повысить производительность. Планируется поддержка вызовов пакетного уровня дляParameterServerStrategy
. - По той же причине, в отличие от других стратегий, индикатор выполнения и метрики регистрируются только на границах эпох.
-
run_eagerly
не поддерживается.
Индивидуальные особенности тренировочного цикла
-
ClusterCoordinator.schedule
не поддерживает гарантии посещения для набора данных. - Когда используется
ClusterCoordinator.create_per_worker_dataset
, весь набор данных должен быть создан внутри переданной ему функции. -
tf.data.Options
игнорируется в наборе данных, созданномClusterCoordinator.create_per_worker_dataset
.