Ver en TensorFlow.org | Ejecutar en Google Colab | Ver fuente en GitHub | Descargar libreta |
Las API de tf.distribute brindan una manera fácil para que los usuarios escalen su capacitación de una sola máquina a varias máquinas. Al escalar su modelo, los usuarios también tienen que distribuir su entrada a través de múltiples dispositivos. tf.distribute
proporciona API con las que puede distribuir automáticamente su entrada entre dispositivos.
Esta guía le mostrará las diferentes formas en que puede crear conjuntos de datos distribuidos e iteradores utilizando las API tf.distribute
. Además, se tratarán los siguientes temas:
- Opciones de uso, fragmentación y procesamiento por lotes al usar
tf.distribute.Strategy.experimental_distribute_dataset
ytf.distribute.Strategy.distribute_datasets_from_function
. - Diferentes formas en las que puede iterar sobre el conjunto de datos distribuido.
- Las diferencias entre las API
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
y las APItf.data
, así como cualquier limitación que los usuarios puedan encontrar en su uso.
Esta guía no cubre el uso de entrada distribuida con las API de Keras.
Conjuntos de datos distribuidos
Para usar las API de tf.distribute
para escalar, se recomienda que los usuarios usen tf.data.Dataset
para representar su entrada. tf.distribute
se ha hecho para que funcione de manera eficiente con tf.data.Dataset
(por ejemplo, búsqueda previa automática de datos en cada dispositivo acelerador) con optimizaciones de rendimiento que se incorporan regularmente en la implementación. Si tiene un caso de uso para usar algo que no sea tf.data.Dataset
, consulte una sección posterior de esta guía. En un ciclo de entrenamiento no distribuido, los usuarios primero crean una instancia de tf.data.Dataset
y luego iteran sobre los elementos. Por ejemplo:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Para permitir que los usuarios utilicen la estrategia tf.distribute
con cambios mínimos en el código existente de un usuario, se introdujeron dos API que distribuirían una instancia de tf.data.Dataset
y devolverían un objeto de conjunto de datos distribuido. Luego, un usuario podría iterar sobre esta instancia de conjunto de datos distribuidos y entrenar su modelo como antes. Veamos ahora las dos API: tf.distribute.Strategy.experimental_distribute_dataset
y tf.distribute.Strategy.distribute_datasets_from_function
con más detalle:
tf.distribute.Strategy.experimental_distribute_dataset
Uso
Esta API toma una instancia de tf.data.Dataset
como entrada y devuelve una instancia de tf.distribute.DistributedDataset
. Debe agrupar el conjunto de datos de entrada con un valor que sea igual al tamaño del lote global. Este tamaño de lote global es la cantidad de muestras que desea procesar en todos los dispositivos en 1 paso. Puede iterar sobre este conjunto de datos distribuido de forma Pythonic o crear un iterador usando iter
. El objeto devuelto no es una instancia de tf.data.Dataset
y no admite ninguna otra API que transforme o inspeccione el conjunto de datos de ninguna manera. Esta es la API recomendada si no tiene formas específicas en las que desea fragmentar su entrada en diferentes réplicas.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Propiedades
procesamiento por lotes
tf.distribute
vuelve a procesar la instancia de tf.data.Dataset
de entrada con un nuevo tamaño de lote que es igual al tamaño de lote global dividido por el número de réplicas sincronizadas. El número de réplicas sincronizadas es igual al número de dispositivos que participan en el gradiente allreduce durante el entrenamiento. Cuando un usuario llama a next
al iterador distribuido, se devuelve un tamaño de lote de datos por réplica en cada réplica. La cardinalidad del conjunto de datos reagrupado siempre será un múltiplo del número de réplicas. Aquí hay un par de ejemplos:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Sin distribución:
- Lote 1: [0, 1, 2, 3]
- Lote 2: [4, 5]
Con distribución en 2 réplicas. El último lote ([4, 5]) se divide entre 2 réplicas.
Lote 1:
- Réplica 1:[0, 1]
- Réplica 2:[2, 3]
Lote 2:
- Réplica 2: [4]
- Réplica 2: [5]
tf.data.Dataset.range(4).batch(4)
- Sin distribución:
- Lote 1: [[0], [1], [2], [3]]
- Con distribución sobre 5 réplicas:
- Lote 1:
- Réplica 1: [0]
- Réplica 2: [1]
- Réplica 3: [2]
- Réplica 4: [3]
- Réplica 5: []
tf.data.Dataset.range(8).batch(4)
- Sin distribución:
- Lote 1: [0, 1, 2, 3]
- Lote 2: [4, 5, 6, 7]
- Con distribución en 3 réplicas:
- Lote 1:
- Réplica 1: [0, 1]
- Réplica 2: [2, 3]
- Réplica 3: []
- Lote 2:
- Réplica 1: [4, 5]
- Réplica 2: [6, 7]
- Réplica 3: []
El reagrupamiento del conjunto de datos tiene una complejidad de espacio que aumenta linealmente con el número de réplicas. Esto significa que para el caso de uso de capacitación de varios trabajadores, la canalización de entrada puede generar errores OOM.
fragmentación
tf.distribute
también fragmenta automáticamente el conjunto de datos de entrada en el entrenamiento de varios trabajadores con MultiWorkerMirroredStrategy
y TPUStrategy
. Cada conjunto de datos se crea en el dispositivo de CPU del trabajador. La fragmentación automática de un conjunto de datos sobre un conjunto de trabajadores significa que a cada trabajador se le asigna un subconjunto del conjunto de datos completo (si se establece la tf.data.experimental.AutoShardPolicy
correcta). Esto es para garantizar que en cada paso, cada trabajador procesará un tamaño de lote global de elementos del conjunto de datos que no se superpongan. Autosharding tiene un par de opciones diferentes que se pueden especificar mediante tf.data.experimental.DistributeOptions
. Tenga en cuenta que no hay fragmentación automática en el entrenamiento de varios trabajadores con ParameterServerStrategy
, y se puede encontrar más información sobre la creación de conjuntos de datos con esta estrategia en el tutorial Estrategia de servidor de parámetros .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Hay tres opciones diferentes que puede configurar para tf.data.experimental.AutoShardPolicy
:
- AUTO: esta es la opción predeterminada, lo que significa que ARCHIVO intentará fragmentar. El intento de fragmentar por ARCHIVO falla si no se detecta un conjunto de datos basado en archivos.
tf.distribute
luego recurrirá a fragmentación por DATA. Tenga en cuenta que si el conjunto de datos de entrada está basado en archivos pero la cantidad de archivos es menor que la cantidad de trabajadores, se generará unInvalidArgumentError
. Si esto sucede, establezca explícitamente la política enAutoShardPolicy.DATA
o divida su fuente de entrada en archivos más pequeños, de modo que la cantidad de archivos sea mayor que la cantidad de trabajadores. ARCHIVO: esta es la opción si desea dividir los archivos de entrada en todos los trabajadores. Debe usar esta opción si la cantidad de archivos de entrada es mucho mayor que la cantidad de trabajadores y los datos en los archivos están distribuidos uniformemente. La desventaja de esta opción es tener trabajadores inactivos si los datos de los archivos no se distribuyen uniformemente. Si la cantidad de archivos es menor que la cantidad de trabajadores, se generará un
InvalidArgumentError
. Si esto sucede, establezca explícitamente la política enAutoShardPolicy.DATA
. Por ejemplo, distribuyamos 2 archivos entre 2 trabajadores con 1 réplica cada uno. El archivo 1 contiene [0, 1, 2, 3, 4, 5] y el archivo 2 contiene [6, 7, 8, 9, 10, 11]. Deje que el número total de réplicas sincronizadas sea 2 y el tamaño del lote global sea 4.- Trabajador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [2, 3]
- Lote 3 = Réplica 1: [4]
- Lote 4 = Réplica 1: [5]
- Trabajador 1:
- Lote 1 = Réplica 2: [6, 7]
- Lote 2 = Réplica 2: [8, 9]
- Lote 3 = Réplica 2: [10]
- Lote 4 = Réplica 2: [11]
DATOS: Esto fragmentará automáticamente los elementos en todos los trabajadores. Cada uno de los trabajadores leerá el conjunto de datos completo y solo procesará el fragmento que se le asignó. Todos los demás fragmentos serán descartados. Esto generalmente se usa si la cantidad de archivos de entrada es menor que la cantidad de trabajadores y desea una mejor fragmentación de los datos entre todos los trabajadores. La desventaja es que todo el conjunto de datos se leerá en cada trabajador. Por ejemplo, distribuyamos 1 archivo entre 2 trabajadores. El archivo 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Deje que el número total de réplicas sincronizadas sea 2.
- Trabajador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [4, 5]
- Lote 3 = Réplica 1: [8, 9]
- Trabajador 1:
- Lote 1 = Réplica 2: [2, 3]
- Lote 2 = Réplica 2: [6, 7]
- Lote 3 = Réplica 2: [10, 11]
DESACTIVADO: si desactiva la fragmentación automática, cada trabajador procesará todos los datos. Por ejemplo, distribuyamos 1 archivo entre 2 trabajadores. El archivo 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Deje que el número total de réplicas sincronizadas sea 2. Luego, cada trabajador verá la siguiente distribución:
- Trabajador 0:
- Lote 1 = Réplica 1: [0, 1]
- Lote 2 = Réplica 1: [2, 3]
- Lote 3 = Réplica 1: [4, 5]
- Lote 4 = Réplica 1: [6, 7]
- Lote 5 = Réplica 1: [8, 9]
Lote 6 = Réplica 1: [10, 11]
Trabajador 1:
Lote 1 = Réplica 2: [0, 1]
Lote 2 = Réplica 2: [2, 3]
Lote 3 = Réplica 2: [4, 5]
Lote 4 = Réplica 2: [6, 7]
Lote 5 = Réplica 2: [8, 9]
Lote 6 = Réplica 2: [10, 11]
captación previa
De forma predeterminada, tf.distribute
agrega una transformación de captación previa al final de la instancia de tf.data.Dataset
proporcionada por el usuario. El argumento de la transformación de captación previa, que es buffer_size
es igual al número de réplicas sincronizadas.
tf.distribute.Strategy.distribute_datasets_from_function
Uso
Esta API toma una función de entrada y devuelve una instancia de tf.distribute.DistributedDataset
. La función de entrada que los usuarios pasan tiene un argumento tf.distribute.InputContext
y debe devolver una instancia de tf.data.Dataset
. Con esta API, tf.distribute
no realiza más cambios en la instancia tf.data.Dataset
del usuario devuelta desde la función de entrada. Es responsabilidad del usuario agrupar y fragmentar el conjunto de datos. tf.distribute
llama a la función de entrada en el dispositivo CPU de cada uno de los trabajadores. Además de permitir a los usuarios especificar su propia lógica de procesamiento por lotes y fragmentación, esta API también demuestra una mejor escalabilidad y rendimiento en comparación con tf.distribute.Strategy.experimental_distribute_dataset
cuando se usa para la capacitación de varios trabajadores.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Propiedades
procesamiento por lotes
La instancia tf.data.Dataset
que es el valor de retorno de la función de entrada debe procesarse por lotes utilizando el tamaño de lote por réplica. El tamaño del lote por réplica es el tamaño del lote global dividido por la cantidad de réplicas que participan en el entrenamiento de sincronización. Esto se debe a que tf.distribute
llama a la función de entrada en el dispositivo de CPU de cada uno de los trabajadores. El conjunto de datos que se crea en un trabajador determinado debe estar listo para que lo usen todas las réplicas de ese trabajador.
fragmentación
El objeto tf.distribute.InputContext
que se pasa implícitamente como argumento a la función de entrada del usuario es creado por tf.distribute
bajo el capó. Tiene información sobre la cantidad de trabajadores, la identificación del trabajador actual, etc. Esta función de entrada puede manejar la fragmentación según las políticas establecidas por el usuario utilizando estas propiedades que forman parte del objeto tf.distribute.InputContext
.
captación previa
tf.distribute
no agrega una transformación de captación previa al final de tf.data.Dataset
devuelto por la función de entrada proporcionada por el usuario.
Iteradores distribuidos
De forma similar a las instancias de tf.data.Dataset
no distribuidas, deberá crear un iterador en las instancias de tf.distribute.DistributedDataset
para iterar sobre él y acceder a los elementos en tf.distribute.DistributedDataset
. Las siguientes son las formas en que puede crear un tf.distribute.DistributedIterator
y usarlo para entrenar su modelo:
usos
Use una construcción Pythonic for loop
Puede usar un bucle Pythonic fácil de usar para iterar sobre el tf.distribute.DistributedDataset
. Los elementos devueltos por tf.distribute.DistributedIterator
pueden ser un solo tf.Tensor
o un tf.distribute.DistributedValues
que contiene un valor por réplica. Si coloca el bucle dentro de una tf.function
., aumentará el rendimiento. Sin embargo, break
y return
actualmente no son compatibles con un bucle sobre un tf.distribute.DistributedDataset
que se coloca dentro de una tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Use iter
para crear un iterador explícito
Para iterar sobre los elementos en una instancia de tf.distribute.DistributedDataset
, puede crear un tf.distribute.DistributedIterator
utilizando la API iter
en él. Con un iterador explícito, puede iterar durante un número fijo de pasos. Para obtener el siguiente elemento de una instancia de tf.distribute.DistributedIterator
dist_iterator
, puede llamar a next(dist_iterator)
, dist_iterator.get_next()
o dist_iterator.get_next_as_optional()
. Los dos primeros son esencialmente lo mismo:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Con next()
o tf.distribute.DistributedIterator.get_next()
, si tf.distribute.DistributedIterator
ha llegado a su fin, se generará un error OutOfRange. El cliente puede detectar el error en el lado de python y continuar con otros trabajos, como puntos de control y evaluación. Sin embargo, esto no funcionará si está utilizando un ciclo de entrenamiento de host (es decir, ejecuta varios pasos por tf.function
), que se ve así:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
contiene varios pasos al envolver el cuerpo del paso dentro de un tf.range
. En este caso, diferentes iteraciones en el bucle sin dependencia podrían comenzar en paralelo, por lo que se puede desencadenar un error OutOfRange en iteraciones posteriores antes de que finalice el cálculo de las iteraciones anteriores. Una vez que se lanza un error OutOfRange, todas las operaciones en la función se terminarán de inmediato. Si este es un caso que le gustaría evitar, una alternativa que no genera un error OutOfRange es tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
devuelve un tf.experimental.Optional
que contiene el siguiente elemento o ningún valor si tf.distribute.DistributedIterator
ha llegado a su fin.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Usando la propiedad element_spec
Si pasa los elementos de un conjunto de datos distribuido a una tf.function
y desea una garantía de tf.TypeSpec
, puede especificar el argumento input_signature
de la tf.function
. La salida de un conjunto de datos distribuido es tf.distribute.DistributedValues
, que puede representar la entrada a un solo dispositivo o varios dispositivos. Para obtener el tf.TypeSpec
correspondiente a este valor distribuido, puede usar la propiedad element_spec
del conjunto de datos distribuido o el objeto iterador distribuido.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Lotes parciales
Los lotes parciales se encuentran cuando las instancias tf.data.Dataset
que crean los usuarios pueden contener tamaños de lote que no son divisibles por igual por el número de réplicas o cuando la cardinalidad de la instancia del conjunto de datos no es divisible por el tamaño del lote. Esto significa que cuando el conjunto de datos se distribuye en varias réplicas, la next
llamada a algunos iteradores dará como resultado un OutOfRangeError. Para manejar este caso de uso, tf.distribute
devuelve lotes ficticios de tamaño de lote 0 en réplicas que no tienen más datos para procesar.
Para el caso de un solo trabajador, si la next
llamada del iterador no devuelve los datos, se crean lotes ficticios de tamaño de lote 0 y se usan junto con los datos reales en el conjunto de datos. En el caso de lotes parciales, el último lote global de datos contendrá datos reales junto con lotes de datos ficticios. La condición de parada para el procesamiento de datos ahora comprueba si alguna de las réplicas tiene datos. Si no hay datos en ninguna de las réplicas, se genera un error OutOfRange.
Para el caso de varios trabajadores, el valor booleano que representa la presencia de datos en cada uno de los trabajadores se agrega mediante comunicación entre réplicas y se usa para identificar si todos los trabajadores han terminado de procesar el conjunto de datos distribuido. Dado que esto implica la comunicación entre trabajadores, existe una penalización de rendimiento.
Advertencias
Cuando se usan las API
tf.distribute.Strategy.experimental_distribute_dataset
con una configuración de varios trabajadores, los usuarios pasan untf.data.Dataset
que lee archivos. Sitf.data.experimental.AutoShardPolicy
se establece enAUTO
oFILE
, el tamaño de lote real por paso puede ser más pequeño que el tamaño de lote global definido por el usuario. Esto puede suceder cuando los elementos restantes en el archivo son menores que el tamaño del lote global. Los usuarios pueden agotar el conjunto de datos sin depender de la cantidad de pasos a ejecutar o configurartf.data.experimental.AutoShardPolicy
enDATA
para evitarlo.Las transformaciones de conjuntos de datos con estado actualmente no son compatibles con
tf.distribute
y cualquier operación con estado que el conjunto de datos pueda tener se ignora actualmente. Por ejemplo, si su conjunto de datos tiene unmap_fn
que usatf.random.uniform
para rotar una imagen, entonces tiene un gráfico de conjunto de datos que depende del estado (es decir, la semilla aleatoria) en la máquina local donde se ejecuta el proceso de python.Las
tf.data.experimental.OptimizationOptions
experimentales que están deshabilitadas de manera predeterminada pueden, en ciertos contextos, como cuando se usan junto contf.distribute
, causar una degradación del rendimiento. Solo debe habilitarlos después de validar que benefician el rendimiento de su carga de trabajo en una configuración de distribución.Consulte esta guía para saber cómo optimizar su flujo de entrada con
tf.data
en general. Algunos consejos adicionales:Si tiene varios trabajadores y está utilizando
tf.data.Dataset.list_files
para crear un conjunto de datos a partir de todos los archivos que coincidan con uno o más patrones globales, recuerde establecer el argumentoseed
o establecershuffle=False
para que cada trabajador fragmente el archivo de forma coherente.Si su canalización de entrada incluye mezclar los datos en el nivel de registro y analizar los datos, a menos que los datos no analizados sean significativamente más grandes que los datos analizados (que generalmente no es el caso), primero mezcle y luego analice, como se muestra en el siguiente ejemplo. Esto puede beneficiar el uso y el rendimiento de la memoria.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
mantiene un búfer interno de elementos debuffer_size
de búfer y, por lo tanto, reducir el tamaño debuffer_size
podría aliviar el problema de OOM.No se garantiza el orden en que los trabajadores procesan los datos cuando usan
tf.distribute.experimental_distribute_dataset
otf.distribute.distribute_datasets_from_function
. Esto suele ser necesario si utilizatf.distribute
para escalar la predicción. Sin embargo, puede insertar un índice para cada elemento del lote y ordenar las salidas en consecuencia. El siguiente fragmento es un ejemplo de cómo ordenar salidas.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
¿Cómo distribuyo mis datos si no estoy usando una instancia canónica de tf.data.Dataset?
A veces, los usuarios no pueden usar un tf.data.Dataset
para representar su entrada y, posteriormente, las API mencionadas anteriormente para distribuir el conjunto de datos a varios dispositivos. En tales casos, puede usar tensores sin procesar o entradas de un generador.
Use experimental_distribute_values_from_function para entradas de tensor arbitrarias
tf.distribute.DistributedValues
strategy.run
es el resultado de next(iterator)
. Para pasar los valores del tensor, use experimental_distribute_values_from_function
para construir tf.distribute.DistributedValues
a partir de tensores sin procesar.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Use tf.data.Dataset.from_generator si su entrada es de un generador
Si tiene una función de generador que desea usar, puede crear una instancia de tf.data.Dataset
usando la API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.