Voir sur TensorFlow.org | Exécuter dans Google Colab | Voir la source sur GitHub | Télécharger le cahier |
Les API tf.distribute offrent aux utilisateurs un moyen simple de faire évoluer leur formation d'une seule machine à plusieurs machines. Lors de la mise à l'échelle de leur modèle, les utilisateurs doivent également répartir leurs entrées sur plusieurs appareils. tf.distribute
fournit des API à l'aide desquelles vous pouvez distribuer automatiquement votre entrée sur tous les appareils.
Ce guide vous montrera les différentes manières de créer des ensembles de données distribués et des itérateurs à l'aide des API tf.distribute
. De plus, les sujets suivants seront abordés :
- Options d'utilisation, de partitionnement et de traitement par lots lors de l'utilisation
tf.distribute.Strategy.experimental_distribute_dataset
ettf.distribute.Strategy.distribute_datasets_from_function
. - Différentes manières d'itérer sur l'ensemble de données distribué.
- Différences entre les API
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
et les APItf.data
, ainsi que toute limitation que les utilisateurs peuvent rencontrer dans leur utilisation.
Ce guide ne couvre pas l'utilisation de l'entrée distribuée avec les API Keras.
Ensembles de données distribués
Pour utiliser les API tf.distribute
à l'échelle, il est recommandé aux utilisateurs d'utiliser tf.data.Dataset
pour représenter leur entrée. tf.distribute
a été conçu pour fonctionner efficacement avec tf.data.Dataset
(par exemple, la prélecture automatique des données sur chaque périphérique accélérateur) avec des optimisations de performances régulièrement intégrées à l'implémentation. Si vous avez un cas d'utilisation pour utiliser autre chose que tf.data.Dataset
, veuillez vous reporter à une section ultérieure de ce guide. Dans une boucle de formation non distribuée, les utilisateurs créent d'abord une instance tf.data.Dataset
, puis parcourent les éléments. Par example:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Pour permettre aux utilisateurs d'utiliser la stratégie tf.distribute
avec des modifications minimes du code existant d'un utilisateur, deux API ont été introduites pour distribuer une instance tf.data.Dataset
et renvoyer un objet d'ensemble de données distribué. Un utilisateur peut alors itérer sur cette instance d'ensemble de données distribué et former son modèle comme auparavant. Examinons maintenant les deux API - tf.distribute.Strategy.experimental_distribute_dataset
et tf.distribute.Strategy.distribute_datasets_from_function
plus en détail :
tf.distribute.Strategy.experimental_distribute_dataset
Usage
Cette API prend une instance tf.data.Dataset
en entrée et renvoie une instance tf.distribute.DistributedDataset
. Vous devez regrouper le jeu de données d'entrée avec une valeur égale à la taille de lot globale. Cette taille de lot globale correspond au nombre d'échantillons que vous souhaitez traiter sur tous les appareils en une seule étape. Vous pouvez itérer sur cet ensemble de données distribué de manière Pythonique ou créer un itérateur à l'aide de iter
. L'objet renvoyé n'est pas une instance tf.data.Dataset
et ne prend en charge aucune autre API qui transforme ou inspecte l'ensemble de données de quelque manière que ce soit. Il s'agit de l'API recommandée si vous ne disposez pas de moyens spécifiques pour répartir votre entrée sur différentes répliques.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Propriétés
Mise en lot
tf.distribute
rebat l'instance d'entrée tf.data.Dataset
avec une nouvelle taille de lot égale à la taille de lot globale divisée par le nombre de réplicas synchronisés. Le nombre de répliques synchronisées est égal au nombre d'appareils qui participent au gradient allreduce pendant l'entraînement. Lorsqu'un utilisateur appelle next
sur l'itérateur distribué, une taille de lot de données par réplica est renvoyée sur chaque réplica. La cardinalité du jeu de données remis en lot sera toujours un multiple du nombre de réplicas. Voici quelques exemples:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Sans diffusion :
- Lot 1 : [0, 1, 2, 3]
- Lot 2 : [4, 5]
Avec distribution sur 2 répliques. Le dernier lot ([4, 5]) est partagé entre 2 répliques.
Lot 1 :
- Réplique 1 :[0, 1]
- Réplique 2 :[2, 3]
Lot 2 :
- Réplique 2 : [4]
- Réplique 2 : [5]
tf.data.Dataset.range(4).batch(4)
- Sans diffusion :
- Lot 1 : [[0], [1], [2], [3]]
- Avec distribution sur 5 répliques :
- Lot 1 :
- Réplique 1 : [0]
- Réplique 2 : [1]
- Réplique 3 : [2]
- Réplique 4 : [3]
- Réplique 5 : []
tf.data.Dataset.range(8).batch(4)
- Sans diffusion :
- Lot 1 : [0, 1, 2, 3]
- Lot 2 : [4, 5, 6, 7]
- Avec distribution sur 3 répliques :
- Lot 1 :
- Réplique 1 : [0, 1]
- Réplique 2 : [2, 3]
- Réplique 3 : []
- Lot 2 :
- Réplique 1 : [4, 5]
- Réplique 2 : [6, 7]
- Réplique 3 : []
Le rebatching du jeu de données a une complexité spatiale qui augmente linéairement avec le nombre de réplicas. Cela signifie que pour le cas d'utilisation de la formation multi-travailleurs, le pipeline d'entrée peut rencontrer des erreurs OOM.
Partage
tf.distribute
également l'ensemble de données d'entrée dans la formation multi-travailleurs avec MultiWorkerMirroredStrategy
et TPUStrategy
. Chaque ensemble de données est créé sur le périphérique CPU du travailleur. Le partitionnement automatique d'un ensemble de données sur un ensemble de nœuds de calcul signifie que chaque nœud de calcul se voit attribuer un sous-ensemble de l'ensemble de données (si le droit tf.data.experimental.AutoShardPolicy
est défini). Cela permet de s'assurer qu'à chaque étape, une taille de lot globale d'éléments d'ensemble de données qui ne se chevauchent pas sera traitée par chaque travailleur. Le partitionnement automatique a quelques options différentes qui peuvent être spécifiées à l'aide tf.data.experimental.DistributeOptions
. Notez qu'il n'y a pas de partitionnement automatique dans la formation multi-travailleurs avec ParameterServerStrategy
, et plus d'informations sur la création d'ensembles de données avec cette stratégie peuvent être trouvées dans le didacticiel Parameter Server Strategy .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Il existe trois options différentes que vous pouvez définir pour tf.data.experimental.AutoShardPolicy
:
- AUTO : Il s'agit de l'option par défaut, ce qui signifie qu'une tentative de partitionnement sera effectuée par FILE. La tentative de partitionnement par FILE échoue si un ensemble de données basé sur un fichier n'est pas détecté.
tf.distribute
reviendra alors au sharding par DATA. Notez que si l'ensemble de données d'entrée est basé sur des fichiers mais que le nombre de fichiers est inférieur au nombre de travailleurs, uneInvalidArgumentError
sera déclenchée. Si cela se produit, définissez explicitement la stratégie surAutoShardPolicy.DATA
ou divisez votre source d'entrée en fichiers plus petits de sorte que le nombre de fichiers soit supérieur au nombre de travailleurs. FICHIER : il s'agit de l'option si vous souhaitez répartir les fichiers d'entrée sur tous les nœuds de calcul. Vous devez utiliser cette option si le nombre de fichiers d'entrée est beaucoup plus grand que le nombre de nœuds de calcul et si les données des fichiers sont réparties uniformément. L'inconvénient de cette option est d'avoir des travailleurs inactifs si les données dans les fichiers ne sont pas uniformément réparties. Si le nombre de fichiers est inférieur au nombre de travailleurs, une
InvalidArgumentError
sera levée. Si cela se produit, définissez explicitement la stratégie surAutoShardPolicy.DATA
. Par exemple, distribuons 2 fichiers sur 2 workers avec 1 réplique chacun. Le fichier 1 contient [0, 1, 2, 3, 4, 5] et le fichier 2 contient [6, 7, 8, 9, 10, 11]. Supposons que le nombre total de répliques synchronisées soit de 2 et que la taille globale du lot soit de 4.- Travailleur 0 :
- Lot 1 = Réplique 1 : [0, 1]
- Lot 2 = Réplique 1 : [2, 3]
- Lot 3 = Réplique 1 : [4]
- Lot 4 = Réplique 1 : [5]
- Travailleur 1 :
- Lot 1 = Réplique 2 : [6, 7]
- Lot 2 = Réplique 2 : [8, 9]
- Lot 3 = Réplique 2 : [10]
- Lot 4 = Réplique 2 : [11]
DONNÉES : Cela autoshardira les éléments sur tous les travailleurs. Chacun des travailleurs lira l'intégralité de l'ensemble de données et ne traitera que le fragment qui lui est attribué. Tous les autres fragments seront jetés. Ceci est généralement utilisé si le nombre de fichiers d'entrée est inférieur au nombre de nœuds de calcul et que vous souhaitez un meilleur partage des données entre tous les nœuds de calcul. L'inconvénient est que l'intégralité de l'ensemble de données sera lu sur chaque travailleur. Par exemple, distribuons 1 fichiers sur 2 travailleurs. Le fichier 1 contient [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Soit le nombre total de répliques synchronisées égal à 2.
- Travailleur 0 :
- Lot 1 = Réplique 1 : [0, 1]
- Lot 2 = Réplique 1 : [4, 5]
- Lot 3 = Réplique 1 : [8, 9]
- Travailleur 1 :
- Lot 1 = Réplique 2 : [2, 3]
- Lot 2 = Réplique 2 : [6, 7]
- Lot 3 = Réplique 2 : [10, 11]
OFF : si vous désactivez le partitionnement automatique, chaque nœud de calcul traitera toutes les données. Par exemple, distribuons 1 fichiers sur 2 travailleurs. Le fichier 1 contient [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Supposons que le nombre total d'instances dupliquées synchronisées soit de 2. Ensuite, chaque nœud de calcul verra la distribution suivante :
- Travailleur 0 :
- Lot 1 = Réplique 1 : [0, 1]
- Lot 2 = Réplique 1 : [2, 3]
- Lot 3 = Réplique 1 : [4, 5]
- Lot 4 = Réplique 1 : [6, 7]
- Lot 5 = Réplique 1 : [8, 9]
Lot 6 = Réplique 1 : [10, 11]
Travailleur 1 :
Lot 1 = Réplique 2 : [0, 1]
Lot 2 = Réplique 2 : [2, 3]
Lot 3 = Réplique 2 : [4, 5]
Lot 4 = Réplique 2 : [6, 7]
Lot 5 = Réplique 2 : [8, 9]
Lot 6 = Réplique 2 : [10, 11]
Prélecture
Par défaut, tf.distribute
ajoute une transformation de prélecture à la fin de l'instance tf.data.Dataset
fournie par l'utilisateur. L'argument de la transformation de prélecture qui est buffer_size
est égal au nombre de répliques synchronisées.
tf.distribute.Strategy.distribute_datasets_from_function
Usage
Cette API prend une fonction d'entrée et renvoie une instance tf.distribute.DistributedDataset
. La fonction d'entrée que les utilisateurs transmettent a un argument tf.distribute.InputContext
et doit renvoyer une instance tf.data.Dataset
. Avec cette API, tf.distribute
n'apporte aucune autre modification à l'instance tf.data.Dataset
de l'utilisateur renvoyée par la fonction d'entrée. Il est de la responsabilité de l'utilisateur de regrouper et de fragmenter l'ensemble de données. tf.distribute
appelle la fonction d'entrée sur le périphérique CPU de chacun des travailleurs. En plus de permettre aux utilisateurs de spécifier leur propre logique de traitement par lots et de partitionnement, cette API démontre également une meilleure évolutivité et de meilleures performances par rapport à tf.distribute.Strategy.experimental_distribute_dataset
lorsqu'elle est utilisée pour la formation de plusieurs travailleurs.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Propriétés
Mise en lot
L'instance tf.data.Dataset
qui est la valeur de retour de la fonction d'entrée doit être regroupée en utilisant la taille de lot par réplica. La taille de lot par réplique correspond à la taille de lot globale divisée par le nombre de répliques qui participent à la formation de synchronisation. En effet tf.distribute
appelle la fonction d'entrée sur le périphérique CPU de chacun des travailleurs. L'ensemble de données créé sur un worker donné doit être prêt à être utilisé par toutes les répliques de ce worker.
Partage
L'objet tf.distribute.InputContext
qui est implicitement passé comme argument à la fonction d'entrée de l'utilisateur est créé par tf.distribute
sous le capot. Il contient des informations sur le nombre de travailleurs, l'ID de travailleur actuel, etc. Cette fonction d'entrée peut gérer le partitionnement conformément aux politiques définies par l'utilisateur à l'aide de ces propriétés qui font partie de l'objet tf.distribute.InputContext
.
Prélecture
tf.distribute
n'ajoute pas de transformation de prélecture à la fin du tf.data.Dataset
renvoyé par la fonction d'entrée fournie par l'utilisateur.
Itérateurs distribués
Semblable aux instances tf.data.Dataset
non distribuées, vous devrez créer un itérateur sur les instances tf.distribute.DistributedDataset
pour itérer dessus et accéder aux éléments du tf.distribute.DistributedDataset
. Voici les façons dont vous pouvez créer un tf.distribute.DistributedIterator
et l'utiliser pour entraîner votre modèle :
Coutumes
Utiliser une construction de boucle Pythonic for
Vous pouvez utiliser une boucle Pythonic conviviale pour parcourir le tf.distribute.DistributedDataset
. Les éléments renvoyés par tf.distribute.DistributedIterator
peuvent être un seul tf.Tensor
ou un tf.distribute.DistributedValues
qui contient une valeur par réplica. Placer la boucle dans une tf.function
donnera une amélioration des performances. Cependant, break
et return
ne sont actuellement pas pris en charge pour une boucle sur un tf.distribute.DistributedDataset
qui est placé à l'intérieur d'un tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Utiliser iter
pour créer un itérateur explicite
Pour parcourir les éléments d'une instance tf.distribute.DistributedDataset
, vous pouvez créer un tf.distribute.DistributedIterator
à l'aide de l'API iter
dessus. Avec un itérateur explicite, vous pouvez itérer sur un nombre fixe d'étapes. Pour obtenir l'élément suivant d'une instance tf.distribute.DistributedIterator
dist_iterator
, vous pouvez appeler next(dist_iterator)
, dist_iterator.get_next()
ou dist_iterator.get_next_as_optional()
. Les deux premiers sont essentiellement les mêmes :
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Avec next()
ou tf.distribute.DistributedIterator.get_next()
, si le tf.distribute.DistributedIterator
a atteint sa fin, une erreur OutOfRange sera levée. Le client peut détecter l'erreur du côté python et continuer à effectuer d'autres travaux tels que les points de contrôle et l'évaluation. Cependant, cela ne fonctionnera pas si vous utilisez une boucle de formation hôte (c'est-à-dire, exécutez plusieurs étapes par tf.function
), qui ressemble à :
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
contient plusieurs étapes en enveloppant le corps de l'étape dans un tf.range
. Dans ce cas, différentes itérations dans la boucle sans dépendance peuvent démarrer en parallèle, de sorte qu'une erreur OutOfRange peut être déclenchée dans les itérations ultérieures avant la fin du calcul des itérations précédentes. Une fois qu'une erreur OutOfRange est générée, toutes les opérations de la fonction seront immédiatement terminées. S'il s'agit d'un cas que vous souhaitez éviter, une alternative qui ne génère pas d'erreur OutOfRange est tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
renvoie un tf.experimental.Optional
qui contient l'élément suivant ou aucune valeur si le tf.distribute.DistributedIterator
a atteint sa fin.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Utilisation de la propriété element_spec
Si vous transmettez les éléments d'un ensemble de données distribué à une tf.function
et souhaitez une garantie tf.TypeSpec
, vous pouvez spécifier l'argument input_signature
de la tf.function
. La sortie d'un ensemble de données distribué est tf.distribute.DistributedValues
qui peut représenter l'entrée d'un seul appareil ou de plusieurs appareils. Pour obtenir le tf.TypeSpec
correspondant à cette valeur distribuée, vous pouvez utiliser la propriété element_spec
de l'ensemble de données distribué ou de l'objet itérateur distribué.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Lots partiels
Des lots partiels sont rencontrés lorsque les instances tf.data.Dataset
par les utilisateurs peuvent contenir des tailles de lot qui ne sont pas uniformément divisibles par le nombre de répliques ou lorsque la cardinalité de l'instance de l'ensemble de données n'est pas divisible par la taille du lot. Cela signifie que lorsque l'ensemble de données est distribué sur plusieurs répliques, le next
appel sur certains itérateurs entraînera une OutOfRangeError. Pour gérer ce cas d'utilisation, tf.distribute
renvoie des lots factices de taille de lot 0 sur les répliques qui n'ont plus de données à traiter.
Pour le cas d'un seul travailleur, si les données ne sont pas renvoyées par le next
appel sur l'itérateur, des lots factices de taille de lot 0 sont créés et utilisés avec les données réelles dans l'ensemble de données. Dans le cas de lots partiels, le dernier lot global de données contiendra des données réelles aux côtés de lots de données factices. La condition d'arrêt du traitement des données vérifie désormais si l'un des réplicas contient des données. S'il n'y a aucune donnée sur l'un des réplicas, une erreur OutOfRange est renvoyée.
Dans le cas de plusieurs travailleurs, la valeur booléenne représentant la présence de données sur chacun des travailleurs est agrégée à l'aide d'une communication entre répliques et est utilisée pour déterminer si tous les travailleurs ont fini de traiter l'ensemble de données distribué. Étant donné que cela implique une communication entre les travailleurs, il y a une certaine pénalité de performance impliquée.
Mises en garde
Lors de l'utilisation des API
tf.distribute.Strategy.experimental_distribute_dataset
avec une configuration à plusieurs nœuds de calcul, les utilisateurs transmettent untf.data.Dataset
qui lit les fichiers. Sitf.data.experimental.AutoShardPolicy
est défini surAUTO
ouFILE
, la taille de lot réelle par étape peut être inférieure à la taille de lot globale définie par l'utilisateur. Cela peut se produire lorsque les éléments restants dans le fichier sont inférieurs à la taille globale du lot. Les utilisateurs peuvent soit épuiser l'ensemble de données sans dépendre du nombre d'étapes à exécuter, soit définirtf.data.experimental.AutoShardPolicy
surDATA
pour contourner ce problème.Les transformations d'ensemble de données avec état ne sont actuellement pas prises en charge avec
tf.distribute
et toutes les opérations avec état que l'ensemble de données peut avoir sont actuellement ignorées. Par exemple, si votre jeu de données a unmap_fn
qui utilisetf.random.uniform
pour faire pivoter une image, alors vous avez un graphe de jeu de données qui dépend de l'état (c'est-à-dire la graine aléatoire) sur la machine locale où le processus python est exécuté.Les options expérimentales
tf.data.experimental.OptimizationOptions
désactivées par défaut peuvent, dans certains contextes, par exemple lorsqu'elles sont utilisées avectf.distribute
, entraîner une dégradation des performances. Vous ne devez les activer qu'après avoir validé qu'ils profitent aux performances de votre charge de travail dans un paramètre de distribution.Veuillez vous référer à ce guide pour savoir comment optimiser votre pipeline d'entrée avec
tf.data
en général. Quelques conseils supplémentaires :Si vous avez plusieurs nœuds de calcul et que vous utilisez
tf.data.Dataset.list_files
pour créer un ensemble de données à partir de tous les fichiers correspondant à un ou plusieurs modèles glob, n'oubliez pas de définir l'argumentseed
ou de définirshuffle=False
afin que chaque nœud de calcul partitionne le fichier de manière cohérente.Si votre pipeline d'entrée comprend à la fois le brassage des données au niveau de l'enregistrement et l'analyse des données, à moins que les données non analysées ne soient beaucoup plus volumineuses que les données analysées (ce qui n'est généralement pas le cas), mélangez d'abord, puis analysez, comme illustré dans l'exemple suivant. Cela peut améliorer l'utilisation de la mémoire et les performances.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
maintient un tampon interne d'élémentsbuffer_size
, et ainsi réduirebuffer_size
pourrait atténuer le problème OOM.L'ordre dans lequel les données sont traitées par les travailleurs lors de l'utilisation
tf.distribute.experimental_distribute_dataset
outf.distribute.distribute_datasets_from_function
n'est pas garanti. Ceci est généralement requis si vous utiliseztf.distribute
pour mettre à l'échelle la prédiction. Vous pouvez cependant insérer un index pour chaque élément du lot et ordonner les sorties en conséquence. L'extrait de code suivant est un exemple de la façon de commander les sorties.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Comment distribuer mes données si je n'utilise pas une instance canonique tf.data.Dataset ?
Parfois, les utilisateurs ne peuvent pas utiliser un tf.data.Dataset
pour représenter leur entrée et par la suite les API mentionnées ci-dessus pour distribuer l'ensemble de données à plusieurs appareils. Dans de tels cas, vous pouvez utiliser des tenseurs bruts ou des entrées d'un générateur.
Utilisez experimental_distribute_values_from_function pour les entrées de tenseur arbitraires
strategy.run
accepte tf.distribute.DistributedValues
qui est la sortie de next(iterator)
. Pour transmettre les valeurs de tenseur, utilisez experimental_distribute_values_from_function
pour construire tf.distribute.DistributedValues
à partir de tenseurs bruts.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Utilisez tf.data.Dataset.from_generator si votre entrée provient d'un générateur
Si vous souhaitez utiliser une fonction de générateur, vous pouvez créer une instance tf.data.Dataset
à l'aide de l'API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.