Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
API-интерфейсы tf.distribute предоставляют пользователям простой способ масштабировать свое обучение с одной машины на несколько машин. При масштабировании своей модели пользователям также приходится распределять вводимые данные на несколько устройств. tf.distribute
предоставляет API, с помощью которых вы можете автоматически распределять вводимые данные между устройствами.
Это руководство покажет вам различные способы создания распределенного набора данных и итераторов с помощью API tf.distribute
. Дополнительно будут затронуты следующие темы:
- Варианты использования, сегментирования и пакетной обработки при использовании
tf.distribute.Strategy.experimental_distribute_dataset
иtf.distribute.Strategy.distribute_datasets_from_function
. - Различные способы перебора распределенного набора данных.
- Различия между API
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
и APItf.data
, а также любые ограничения, с которыми пользователи могут столкнуться при их использовании.
В этом руководстве не рассматривается использование распределенного ввода с API-интерфейсами Keras.
Распределенные наборы данных
Чтобы использовать API-интерфейсы tf.distribute
для масштабирования, пользователям рекомендуется использовать tf.data.Dataset
для представления своих входных данных. tf.distribute
был создан для эффективной работы с tf.data.Dataset
(например, автоматическая предварительная выборка данных на каждое ускорительное устройство), а оптимизация производительности регулярно включается в реализацию. Если у вас есть вариант использования чего-то другого, кроме tf.data.Dataset
, обратитесь к следующему разделу этого руководства. В нераспределенном цикле обучения пользователи сначала создают экземпляр tf.data.Dataset
, а затем перебирают элементы. Например:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Чтобы позволить пользователям использовать стратегию tf.distribute
с минимальными изменениями в существующем коде пользователя, были введены два API, которые будут распространять экземпляр tf.data.Dataset
и возвращать объект распределенного набора данных. Затем пользователь может выполнить итерацию по этому экземпляру распределенного набора данных и обучить свою модель, как и раньше. Давайте теперь более подробно рассмотрим два API — tf.distribute.Strategy.experimental_distribute_dataset
и tf.distribute.Strategy.distribute_datasets_from_function
:
tf.distribute.Strategy.experimental_distribute_dataset
использование
Этот API принимает экземпляр tf.data.Dataset
в качестве входных данных и возвращает экземпляр tf.distribute.DistributedDataset
. Вы должны пакетировать входной набор данных со значением, равным глобальному размеру пакета. Этот глобальный размер пакета — это количество образцов, которые вы хотите обработать на всех устройствах за 1 шаг. Вы можете выполнить итерацию по этому распределенному набору данных в стиле Python или создать итератор с помощью iter
. Возвращенный объект не является экземпляром tf.data.Dataset
и не поддерживает никаких других API-интерфейсов, которые каким-либо образом преобразуют или проверяют набор данных. Это рекомендуемый API, если у вас нет конкретных способов разделения входных данных на разные реплики.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Характеристики
Дозирование
tf.distribute
выполняет повторное пакетирование входного экземпляра tf.data.Dataset
с новым размером пакета, равным общему размеру пакета, деленному на количество синхронизированных реплик. Количество синхронизированных реплик равно количеству устройств, которые принимают участие в градиенте allreduce во время обучения. Когда пользователь вызывает next
распределенный итератор, для каждой реплики возвращается размер пакета данных для каждой реплики. Количество элементов в повторно пакетированном наборе данных всегда будет кратно количеству реплик. Вот несколько примеров:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Без распространения:
- Пакет 1: [0, 1, 2, 3]
- Пакет 2: [4, 5]
С раздачей по 2 реплики. Последняя партия ([4, 5]) делится на 2 реплики.
Пакет 1:
- Реплика 1: [0, 1]
- Реплика 2: [2, 3]
Пакет 2:
- Реплика 2: [4]
- Реплика 2: [5]
tf.data.Dataset.range(4).batch(4)
- Без распространения:
- Пакет 1: [[0], [1], [2], [3]]
- При распределении по 5 репликам:
- Пакет 1:
- Реплика 1: [0]
- Реплика 2: [1]
- Реплика 3: [2]
- Реплика 4: [3]
- Реплика 5: []
tf.data.Dataset.range(8).batch(4)
- Без распространения:
- Пакет 1: [0, 1, 2, 3]
- Пакет 2: [4, 5, 6, 7]
- При распределении по 3 репликам:
- Пакет 1:
- Реплика 1: [0, 1]
- Реплика 2: [2, 3]
- Реплика 3: []
- Пакет 2:
- Реплика 1: [4, 5]
- Реплика 2: [6, 7]
- Реплика 3: []
Повторное пакетирование набора данных имеет пространственную сложность, которая линейно увеличивается с количеством реплик. Это означает, что в случае использования обучения с несколькими работниками входной конвейер может столкнуться с ошибками OOM.
Разделение
tf.distribute
также автоматически разбивает входной набор данных при обучении нескольких рабочих с помощью MultiWorkerMirroredStrategy
и TPUStrategy
. Каждый набор данных создается на процессорном устройстве рабочего. Автоматическое разбиение набора данных на набор рабочих процессов означает, что каждому рабочему процессу назначается подмножество всего набора данных (если установлено правильное tf.data.experimental.AutoShardPolicy
). Это делается для того, чтобы на каждом этапе глобальный пакет неперекрывающихся элементов набора данных обрабатывался каждым рабочим. Autosharding имеет несколько различных параметров, которые можно указать с помощью tf.data.experimental.DistributeOptions
. Обратите внимание, что в обучении нескольких сотрудников с помощью ParameterServerStrategy
нет автоматического сегментирования, и дополнительную информацию о создании набора данных с помощью этой стратегии можно найти в учебнике по стратегии сервера параметров .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Есть три разных параметра, которые вы можете установить для tf.data.experimental.AutoShardPolicy
:
- АВТО: это параметр по умолчанию, который означает, что будет предпринята попытка сегментирования по ФАЙЛУ. Попытка разбиения по ФАЙЛУ завершается неудачно, если набор данных на основе файлов не обнаружен.
tf.distribute
вернется к сегментированию по ДАННЫМ. Обратите внимание, что если входной набор данных основан на файлах, но количество файлов меньше, чем количество рабочих, будетInvalidArgumentError
. В этом случае явно задайте для политики значениеAutoShardPolicy.DATA
или разделите источник входных данных на файлы меньшего размера, чтобы количество файлов превышало количество рабочих процессов. ФАЙЛ: Это вариант, если вы хотите разделить входные файлы на все рабочие процессы. Вы должны использовать эту опцию, если количество входных файлов намного больше, чем количество рабочих процессов, и данные в файлах распределены равномерно. Недостатком этого варианта является наличие простаивающих рабочих, если данные в файлах распределены неравномерно. Если количество файлов меньше количества рабочих процессов, будет
InvalidArgumentError
. В этом случае явно задайте для политики значениеAutoShardPolicy.DATA
. Например, давайте распределим 2 файла по 2 воркерам с 1 репликой каждый. Файл 1 содержит [0, 1, 2, 3, 4, 5], а файл 2 содержит [6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизированных реплик равно 2, а размер глобального пакета равен 4.- Рабочий 0:
- Пакет 1 = Реплика 1: [0, 1]
- Пакет 2 = Реплика 1: [2, 3]
- Пакет 3 = Реплика 1: [4]
- Пакет 4 = Реплика 1: [5]
- Рабочий 1:
- Пакет 1 = Реплика 2: [6, 7]
- Пакет 2 = Реплика 2: [8, 9]
- Пакет 3 = Реплика 2: [10]
- Партия 4 = Реплика 2: [11]
ДАННЫЕ: это приведет к автоматическому разбиению элементов по всем рабочим процессам. Каждый из рабочих будет читать весь набор данных и обрабатывать только назначенный ему сегмент. Все остальные осколки будут удалены. Это обычно используется, если количество входных файлов меньше, чем количество рабочих процессов, и вы хотите лучше разделить данные по всем рабочим процессам. Недостатком является то, что весь набор данных будет прочитан на каждом воркере. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество синхронизированных реплик равно 2.
- Рабочий 0:
- Пакет 1 = Реплика 1: [0, 1]
- Пакет 2 = Реплика 1: [4, 5]
- Пакет 3 = Реплика 1: [8, 9]
- Рабочий 1:
- Пакет 1 = Реплика 2: [2, 3]
- Пакет 2 = Реплика 2: [6, 7]
- Пакет 3 = Реплика 2: [10, 11]
ВЫКЛ: если вы отключите автошардинг, каждый воркер будет обрабатывать все данные. Например, давайте распределим 1 файл по 2 рабочим. Файл 1 содержит [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Пусть общее количество реплик в синхронизации равно 2. Тогда каждый воркер увидит следующее распределение:
- Рабочий 0:
- Пакет 1 = Реплика 1: [0, 1]
- Пакет 2 = Реплика 1: [2, 3]
- Пакет 3 = Реплика 1: [4, 5]
- Пакет 4 = Реплика 1: [6, 7]
- Пакет 5 = Реплика 1: [8, 9]
Пакет 6 = Реплика 1: [10, 11]
Рабочий 1:
Пакет 1 = Реплика 2: [0, 1]
Пакет 2 = Реплика 2: [2, 3]
Пакет 3 = Реплика 2: [4, 5]
Пакет 4 = Реплика 2: [6, 7]
Пакет 5 = Реплика 2: [8, 9]
Пакет 6 = Реплика 2: [10, 11]
Предварительная загрузка
По умолчанию tf.distribute
добавляет преобразование предварительной выборки в конце предоставленного пользователем экземпляра tf.data.Dataset
. Аргумент преобразования предварительной выборки, который равен buffer_size
, равен количеству синхронизированных реплик.
tf.distribute.Strategy.distribute_datasets_from_function
использование
Этот API принимает функцию ввода и возвращает экземпляр tf.distribute.DistributedDataset
. Функция ввода, которую передают пользователи, имеет аргумент tf.distribute.InputContext
и должна возвращать экземпляр tf.data.Dataset
. С помощью этого API tf.distribute
не вносит никаких дальнейших изменений в пользовательский экземпляр tf.data.Dataset
, возвращаемый функцией ввода. Ответственность за пакетную обработку и сегментацию набора данных лежит на пользователе. tf.distribute
вызывает функцию ввода на процессорном устройстве каждого из воркеров. Помимо предоставления пользователям возможности указывать свою собственную логику пакетной обработки и сегментирования, этот API также демонстрирует лучшую масштабируемость и производительность по сравнению с tf.distribute.Strategy.experimental_distribute_dataset
при использовании для обучения нескольких сотрудников.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Характеристики
Дозирование
Экземпляр tf.data.Dataset
, который является возвращаемым значением входной функции, должен быть упакован с использованием размера пакета для каждой реплики. Размер пакета реплики — это глобальный размер пакета, разделенный на количество реплик, участвующих в обучении синхронизации. Это связано с тем, что tf.distribute
вызывает функцию ввода на процессорном устройстве каждого из рабочих процессов. Набор данных, созданный для данного рабочего процесса, должен быть готов к использованию всеми репликами этого рабочего процесса.
Разделение
Объект tf.distribute.InputContext
, который неявно передается в качестве аргумента пользовательской функции ввода, создается tf.distribute
под капотом. Он содержит информацию о количестве работников, текущем идентификаторе работника и т. д. Эта функция ввода может обрабатывать сегментирование в соответствии с политиками, установленными пользователем, с использованием этих свойств, которые являются частью объекта tf.distribute.InputContext
.
Предварительная загрузка
tf.distribute
не добавляет преобразование предварительной выборки в конце tf.data.Dataset
, возвращаемого предоставленной пользователем функцией ввода.
Распределенные итераторы
Подобно нераспределенным экземплярам tf.data.Dataset
, вам потребуется создать итератор для экземпляров tf.distribute.DistributedDataset
, чтобы перебирать его и получать доступ к элементам в tf.distribute.DistributedDataset
. Ниже приведены способы создания tf.distribute.DistributedIterator
и использования его для обучения модели:
Использование
Используйте конструкцию цикла Pythonic for
Вы можете использовать удобный для пользователя цикл Pythonic для перебора tf.distribute.DistributedDataset
. Элементы, возвращаемые из tf.distribute.DistributedIterator
, могут быть одним tf.Tensor
или tf.distribute.DistributedValues
, который содержит значение для каждой реплики. Размещение цикла внутри tf.function
даст прирост производительности. Однако break
и return
в настоящее время не поддерживаются для цикла по tf.distribute.DistributedDataset
, помещенному внутри tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Используйте iter
для создания явного итератора
Чтобы перебирать элементы в экземпляре tf.distribute.DistributedDataset
, вы можете создать tf.distribute.DistributedIterator
, используя для него API iter
. С явным итератором вы можете выполнять итерацию фиксированное количество шагов. Чтобы получить следующий элемент из экземпляра tf.distribute.DistributedIterator
dist_iterator
, вы можете вызвать next(dist_iterator)
, dist_iterator.get_next()
или dist_iterator.get_next_as_optional()
. Первые два по сути одинаковы:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
При использовании next()
или tf.distribute.DistributedIterator.get_next()
, если tf.distribute.DistributedIterator
достиг своего конца, будет выдана ошибка OutOfRange. Клиент может перехватить ошибку на стороне Python и продолжить выполнение другой работы, такой как контрольные точки и оценка. Однако это не сработает, если вы используете цикл обучения хоста (т. е. выполняете несколько шагов для каждой tf.function
), который выглядит так:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
содержит несколько шагов, заключая тело шага в tf.range
. В этом случае разные итерации в цикле без зависимости могут запускаться параллельно, поэтому ошибка OutOfRange может быть вызвана в более поздних итерациях до завершения вычисления предыдущих итераций. Как только выдается ошибка OutOfRange, все операции в функции будут немедленно завершены. Если это тот случай, которого вы хотели бы избежать, альтернативой, которая не выдает ошибку OutOfRange, является tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
возвращает tf.experimental.Optional
, который содержит следующий элемент или не содержит значения, если tf.distribute.DistributedIterator
достиг конца.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Использование свойства element_spec
Если вы передаете элементы распределенного набора данных в tf.function
и хотите получить гарантию tf.TypeSpec
, вы можете указать аргумент input_signature
tf.function
. Выходом распределенного набора данных является tf.distribute.DistributedValues
, который может представлять вход для одного устройства или нескольких устройств. Чтобы получить tf.TypeSpec
соответствующий этому распределенному значению, вы можете использовать свойство element_spec
распределенного набора данных или объекта распределенного итератора.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Частичные партии
Частичные пакеты возникают, когда экземпляры tf.data.Dataset
, создаваемые пользователями, могут содержать размеры пакетов, которые не делятся без остатка на количество реплик, или когда кардинальность экземпляра набора данных не делится на размер пакета. Это означает, что когда набор данных распределен по нескольким репликам, next
вызов некоторых итераторов приведет к ошибке OutOfRangeError. Чтобы справиться с этим вариантом использования, tf.distribute
возвращает фиктивные пакеты с размером пакета 0 на репликах, у которых больше нет данных для обработки.
Для случая с одним рабочим, если данные не возвращаются при next
вызове итератора, создаются фиктивные пакеты нулевого размера, которые используются вместе с реальными данными в наборе данных. В случае частичных пакетов последний глобальный пакет данных будет содержать реальные данные наряду с фиктивными пакетами данных. Условие остановки обработки данных теперь проверяет, есть ли данные на какой-либо из реплик. Если нет данных ни на одной из реплик, выдается ошибка OutOfRange.
В случае с несколькими рабочими процессами логическое значение, представляющее наличие данных на каждом из рабочих процессов, агрегируется с использованием связи между репликами, и это используется для определения того, завершили ли все рабочие процессы обработку распределенного набора данных. Поскольку это включает в себя взаимодействие между работниками, это приводит к некоторому снижению производительности.
Предостережения
При использовании API-интерфейсов
tf.distribute.Strategy.experimental_distribute_dataset
с настройкой нескольких рабочих процессов пользователи передают наборtf.data.Dataset
, который считывается из файлов. Если дляtf.data.experimental.AutoShardPolicy
установлено значениеAUTO
илиFILE
, фактический размер пакета для каждого шага может быть меньше, чем определенный пользователем глобальный размер пакета. Это может произойти, когда оставшиеся элементы в файле меньше глобального размера пакета. Пользователи могут либо исчерпать набор данных, не завися от количества выполняемых шагов, либо установить дляtf.data.experimental.AutoShardPolicy
значениеDATA
, чтобы обойти это.Преобразования набора данных с отслеживанием состояния в настоящее время не поддерживаются с помощью
tf.distribute
и любые операции с отслеживанием состояния, которые может иметь набор данных, в настоящее время игнорируются. Например, если в вашем наборе данных естьmap_fn
, который используетtf.random.uniform
для поворота изображения, то у вас есть график набора данных, который зависит от состояния (то есть случайного начального числа) на локальном компьютере, где выполняется процесс Python.Экспериментальные
tf.data.experimental.OptimizationOptions
, отключенные по умолчанию, могут в определенных контекстах — например, при использовании вместе сtf.distribute
— привести к снижению производительности. Их следует включать только после того, как вы подтвердите, что они улучшают производительность вашей рабочей нагрузки в настройках распространения.Пожалуйста, обратитесь к этому руководству , чтобы узнать, как оптимизировать ваш конвейер ввода с помощью
tf.data
в целом. Несколько дополнительных советов:Если у вас есть несколько рабочих процессов и вы используете
tf.data.Dataset.list_files
для создания набора данных из всех файлов, соответствующих одному или нескольким шаблонам глобусов, не забудьте установитьseed
аргумент или установитьshuffle=False
, чтобы каждый рабочий процесс разбивал файл последовательно.Если ваш входной конвейер включает в себя как перетасовку данных на уровне записи, так и синтаксический анализ данных, если не проанализированные данные значительно больше, чем проанализированные данные (что обычно не так), сначала перетасуйте, а затем проанализируйте, как показано в следующем примере. Это может улучшить использование памяти и производительность.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
поддерживает внутренний буфер элементовbuffer_size
, и, таким образом, уменьшениеbuffer_size
может облегчить проблему OOM.Порядок, в котором данные обрабатываются воркерами при использовании
tf.distribute.experimental_distribute_dataset
илиtf.distribute.distribute_datasets_from_function
, не гарантируется. Обычно это требуется, если вы используетеtf.distribute
для предсказания масштаба. Однако вы можете вставить индекс для каждого элемента в пакете и соответствующим образом упорядочить выходные данные. Следующий фрагмент является примером того, как упорядочить выходные данные.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Как мне распространять свои данные, если я не использую канонический экземпляр tf.data.Dataset?
Иногда пользователи не могут использовать tf.data.Dataset
для представления своих входных данных, а затем упомянутые выше API для распространения набора данных на несколько устройств. В таких случаях вы можете использовать необработанные тензоры или входные данные от генератора.
Используйте Experiment_distribute_values_from_function для произвольных тензорных входов
strategy.run
принимает tf.distribute.DistributedValues
, который является выходом next(iterator)
. Чтобы передать значения тензора, используйте tf.distribute.DistributedValues
experimental_distribute_values_from_function
необработанных тензоров.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Используйте tf.data.Dataset.from_generator, если ваш ввод исходит от генератора
Если у вас есть функция генератора, которую вы хотите использовать, вы можете создать экземпляр tf.data.Dataset
с помощью API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.