Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Le API tf.distribute forniscono agli utenti un modo semplice per scalare la formazione da una singola macchina a più macchine. Quando si ridimensiona il proprio modello, gli utenti devono anche distribuire il proprio input su più dispositivi. tf.distribute
fornisce API che consentono di distribuire automaticamente l'input su tutti i dispositivi.
Questa guida ti mostrerà i diversi modi in cui puoi creare set di dati distribuiti e iteratori utilizzando le API tf.distribute
. Inoltre verranno trattati i seguenti argomenti:
- Opzioni di utilizzo, partizionamento orizzontale e batch quando si utilizzano
tf.distribute.Strategy.experimental_distribute_dataset
etf.distribute.Strategy.distribute_datasets_from_function
. - Diversi modi in cui è possibile eseguire l'iterazione sul set di dati distribuito.
- Differenze tra le API
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
e le APItf.data
, nonché eventuali limitazioni che gli utenti possono incontrare nel loro utilizzo.
Questa guida non copre l'utilizzo dell'input distribuito con le API Keras.
Set di dati distribuiti
Per utilizzare le API tf.distribute
per la scalabilità, si consiglia agli utenti di utilizzare tf.data.Dataset
per rappresentare il proprio input. tf.distribute
è stato progettato per funzionare in modo efficiente con tf.data.Dataset
(ad esempio, precaricamento automatico dei dati su ciascun dispositivo acceleratore) con ottimizzazioni delle prestazioni regolarmente incorporate nell'implementazione. Se si dispone di un caso d'uso per l'utilizzo di qualcosa di diverso da tf.data.Dataset
, fare riferimento a una sezione successiva in questa guida. In un ciclo di addestramento non distribuito, gli utenti creano prima un'istanza tf.data.Dataset
e quindi ripetono gli elementi. Per esempio:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Per consentire agli utenti di utilizzare la strategia tf.distribute
con modifiche minime al codice esistente di un utente, sono state introdotte due API che distribuirebbero un'istanza tf.data.Dataset
e restituirebbero un oggetto dataset distribuito. Un utente può quindi eseguire l'iterazione su questa istanza del set di dati distribuito e addestrare il proprio modello come prima. Esaminiamo ora le due API: tf.distribute.Strategy.experimental_distribute_dataset
e tf.distribute.Strategy.distribute_datasets_from_function
in modo più dettagliato:
tf.distribute.Strategy.experimental_distribute_dataset
Utilizzo
Questa API accetta un'istanza tf.data.Dataset
come input e restituisce un'istanza tf.distribute.DistributedDataset
. È necessario eseguire il batch del set di dati di input con un valore uguale alla dimensione batch globale. Questa dimensione batch globale è il numero di campioni che desideri elaborare su tutti i dispositivi in 1 passaggio. Puoi scorrere questo set di dati distribuito in modo Pythonic o creare un iteratore usando iter
. L'oggetto restituito non è un'istanza tf.data.Dataset
e non supporta altre API che trasformano o ispezionano il set di dati in alcun modo. Questa è l'API consigliata se non disponi di modi specifici in cui desideri suddividere l'input su repliche diverse.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Proprietà
Dosaggio
tf.distribute
ribatta l'istanza di input tf.data.Dataset
con una nuova dimensione del batch uguale alla dimensione del batch globale divisa per il numero di repliche sincronizzate. Il numero di repliche sincronizzate è uguale al numero di dispositivi che partecipano al gradiente allriduce durante l'allenamento. Quando un utente effettua la chiamata next
sull'iteratore distribuito, viene restituita una dimensione batch di dati per replica su ciascuna replica. La cardinalità del set di dati ribattuto sarà sempre un multiplo del numero di repliche. Qui ci sono un paio di esempi:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Senza distribuzione:
- Lotto 1: [0, 1, 2, 3]
- Lotto 2: [4, 5]
Con distribuzione su 2 repliche. L'ultimo batch ([4, 5]) è suddiviso tra 2 repliche.
Lotto 1:
- Replica 1:[0, 1]
- Replica 2:[2, 3]
Lotto 2:
- Replica 2: [4]
- Replica 2: [5]
tf.data.Dataset.range(4).batch(4)
- Senza distribuzione:
- Lotto 1: [[0], [1], [2], [3]]
- Con distribuzione su 5 repliche:
- Lotto 1:
- Replica 1: [0]
- Replica 2: [1]
- Replica 3: [2]
- Replica 4: [3]
- Replica 5: []
tf.data.Dataset.range(8).batch(4)
- Senza distribuzione:
- Lotto 1: [0, 1, 2, 3]
- Lotto 2: [4, 5, 6, 7]
- Con distribuzione su 3 repliche:
- Lotto 1:
- Replica 1: [0, 1]
- Replica 2: [2, 3]
- Replica 3: []
- Lotto 2:
- Replica 1: [4, 5]
- Replica 2: [6, 7]
- Replica 3: []
Il rebatching del set di dati ha una complessità spaziale che aumenta linearmente con il numero di repliche. Ciò significa che per il caso d'uso della formazione per più lavoratori, la pipeline di input può incorrere in errori OOM.
Frammentazione
tf.distribute
inoltre il partizionamento automatico del set di dati di input nella formazione multi-worker con MultiWorkerMirroredStrategy
e TPUStrategy
. Ogni set di dati viene creato sul dispositivo CPU del lavoratore. L'autosharding di un set di dati su un set di lavoratori significa che a ciascun lavoratore viene assegnato un sottoinsieme dell'intero set di dati (se è impostato il tf.data.experimental.AutoShardPolicy
corretto). Questo per garantire che ad ogni passaggio, una dimensione batch globale di elementi del set di dati non sovrapposti venga elaborata da ciascun lavoratore. Il partizionamento automatico ha un paio di opzioni diverse che possono essere specificate usando tf.data.experimental.DistributeOptions
. Si noti che non esiste il partizionamento automatico nella formazione multi-worker con ParameterServerStrategy
e ulteriori informazioni sulla creazione di set di dati con questa strategia sono disponibili nell'esercitazione relativa alla strategia di Parameter Server .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Ci sono tre diverse opzioni che puoi impostare per tf.data.experimental.AutoShardPolicy
:
- AUTO: questa è l'opzione predefinita, il che significa che verrà effettuato un tentativo di shard da FILE. Il tentativo di partizionamento da FILE non riesce se non viene rilevato un set di dati basato su file.
tf.distribute
tornerà quindi allo sharding da parte di DATA. Si noti che se il set di dati di input è basato su file ma il numero di file è inferiore al numero di worker, verrà generato unInvalidArgumentError
. In tal caso, impostare esplicitamente la policy suAutoShardPolicy.DATA
o dividere l'origine di input in file più piccoli in modo che il numero di file sia maggiore del numero di worker. FILE: questa è l'opzione se vuoi dividere i file di input su tutti i lavoratori. È necessario utilizzare questa opzione se il numero di file di input è molto maggiore del numero di lavoratori e i dati nei file sono distribuiti uniformemente. Lo svantaggio di questa opzione è avere lavoratori inattivi se i dati nei file non sono distribuiti uniformemente. Se il numero di file è inferiore al numero di worker, verrà generato un
InvalidArgumentError
. In tal caso, impostare in modo esplicito il criterio suAutoShardPolicy.DATA
. Ad esempio, distribuiamo 2 file su 2 lavoratori con 1 replica ciascuno. Il file 1 contiene [0, 1, 2, 3, 4, 5] e il file 2 contiene [6, 7, 8, 9, 10, 11]. Lascia che il numero totale di repliche sincronizzate sia 2 e la dimensione batch globale sia 4.- Operaio 0:
- Lotto 1 = Replica 1: [0, 1]
- Lotto 2 = Replica 1: [2, 3]
- Lotto 3 = Replica 1: [4]
- Lotto 4 = Replica 1: [5]
- Operaio 1:
- Lotto 1 = Replica 2: [6, 7]
- Lotto 2 = Replica 2: [8, 9]
- Lotto 3 = Replica 2: [10]
- Lotto 4 = Replica 2: [11]
DATI: questo eseguirà la suddivisione automatica degli elementi in tutti i lavoratori. Ciascuno dei lavoratori leggerà l'intero set di dati ed elaborerà solo lo shard assegnato. Tutti gli altri frammenti verranno eliminati. Viene generalmente utilizzato se il numero di file di input è inferiore al numero di lavoratori e si desidera una migliore partizionamento orizzontale dei dati tra tutti i lavoratori. Lo svantaggio è che l'intero set di dati verrà letto su ogni lavoratore. Ad esempio, distribuiamo 1 file su 2 lavoratori. Il file 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Lascia che il numero totale di repliche sincronizzate sia 2.
- Operaio 0:
- Lotto 1 = Replica 1: [0, 1]
- Lotto 2 = Replica 1: [4, 5]
- Lotto 3 = Replica 1: [8, 9]
- Operaio 1:
- Lotto 1 = Replica 2: [2, 3]
- Lotto 2 = Replica 2: [6, 7]
- Lotto 3 = Replica 2: [10, 11]
OFF: se disattivi il partizionamento automatico, ogni lavoratore elaborerà tutti i dati. Ad esempio, distribuiamo 1 file su 2 lavoratori. Il file 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Lascia che il numero totale di repliche sincronizzate sia 2. Quindi ogni lavoratore vedrà la seguente distribuzione:
- Operaio 0:
- Lotto 1 = Replica 1: [0, 1]
- Lotto 2 = Replica 1: [2, 3]
- Lotto 3 = Replica 1: [4, 5]
- Lotto 4 = Replica 1: [6, 7]
- Lotto 5 = Replica 1: [8, 9]
Lotto 6 = Replica 1: [10, 11]
Operaio 1:
Lotto 1 = Replica 2: [0, 1]
Lotto 2 = Replica 2: [2, 3]
Lotto 3 = Replica 2: [4, 5]
Lotto 4 = Replica 2: [6, 7]
Lotto 5 = Replica 2: [8, 9]
Lotto 6 = Replica 2: [10, 11]
Prelettura
Per impostazione predefinita, tf.distribute
aggiunge una trasformazione di precaricamento alla fine dell'istanza tf.data.Dataset
fornita dall'utente. L'argomento della trasformazione di precaricamento che è buffer_size
è uguale al numero di repliche sincronizzate.
tf.distribute.Strategy.distribute_datasets_from_function
Utilizzo
Questa API accetta una funzione di input e restituisce un'istanza tf.distribute.DistributedDataset
. La funzione di input che gli utenti passano ha un argomento tf.distribute.InputContext
e dovrebbe restituire un'istanza tf.data.Dataset
. Con questa API, tf.distribute
non apporta ulteriori modifiche all'istanza tf.data.Dataset
dell'utente restituita dalla funzione di input. È responsabilità dell'utente eseguire il batch e lo shard del set di dati. tf.distribute
chiama la funzione di input sul dispositivo CPU di ciascuno dei lavoratori. Oltre a consentire agli utenti di specificare la propria logica di batching e sharding, questa API dimostra anche una migliore scalabilità e prestazioni rispetto a tf.distribute.Strategy.experimental_distribute_dataset
quando viene utilizzata per la formazione di più lavoratori.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Proprietà
Dosaggio
L'istanza tf.data.Dataset
che è il valore restituito della funzione di input deve essere raggruppata utilizzando la dimensione batch per replica. La dimensione batch per replica è la dimensione batch globale divisa per il numero di repliche che partecipano al training di sincronizzazione. Questo perché tf.distribute
chiama la funzione di input sul dispositivo CPU di ciascuno dei lavoratori. Il set di dati creato su un determinato lavoratore dovrebbe essere pronto per l'uso da parte di tutte le repliche su quel lavoratore.
Frammentazione
L'oggetto tf.distribute.InputContext
che viene passato implicitamente come argomento alla funzione di input dell'utente viene creato da tf.distribute
sotto il cofano. Contiene informazioni sul numero di lavoratori, l'ID lavoratore corrente ecc. Questa funzione di input può gestire il partizionamento orizzontale secondo le politiche impostate dall'utente utilizzando queste proprietà che fanno parte dell'oggetto tf.distribute.InputContext
.
Prelettura
tf.distribute
non aggiunge una trasformazione di precaricamento alla fine del tf.data.Dataset
restituito dalla funzione di input fornita dall'utente.
Iteratori distribuiti
Analogamente alle istanze tf.data.Dataset
non distribuite, sarà necessario creare un iteratore sulle istanze tf.distribute.DistributedDataset
per eseguire l'iterazione su di esso e accedere agli elementi in tf.distribute.DistributedDataset
. Di seguito sono riportati i modi in cui puoi creare un tf.distribute.DistributedIterator
e usarlo per addestrare il tuo modello:
Usi
Usa un costrutto Pythonic for loop
È possibile utilizzare un ciclo Pythonic intuitivo per eseguire l'iterazione su tf.distribute.DistributedDataset
. Gli elementi restituiti da tf.distribute.DistributedIterator
possono essere un singolo tf.Tensor
o un tf.distribute.DistributedValues
che contiene un valore per replica. Posizionare il loop all'interno di una tf.function
darà un aumento delle prestazioni. Tuttavia, l' break
e il return
non sono attualmente supportati per un ciclo su un tf.distribute.DistributedDataset
inserito all'interno di un tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Usa iter
per creare un iteratore esplicito
Per scorrere gli elementi in un'istanza tf.distribute.DistributedDataset
, puoi creare un tf.distribute.DistributedIterator
utilizzando l'API iter
su di esso. Con un iteratore esplicito, puoi eseguire l'iterazione per un numero fisso di passaggi. Per ottenere l'elemento successivo da un'istanza tf.distribute.DistributedIterator
dist_iterator
, puoi chiamare next(dist_iterator)
, dist_iterator.get_next()
o dist_iterator.get_next_as_optional()
. I primi due sono essenzialmente gli stessi:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Con next()
o tf.distribute.DistributedIterator.get_next()
, se tf.distribute.DistributedIterator
ha raggiunto la fine, verrà generato un errore OutOfRange. Il client può rilevare l'errore sul lato Python e continuare a svolgere altri lavori come il checkpoint e la valutazione. Tuttavia, questo non funzionerà se stai utilizzando un ciclo di formazione host (ad esempio, esegui più passaggi per tf.function
), che assomiglia a:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
contiene più passaggi avvolgendo il corpo del passaggio all'interno di un tf.range
. In questo caso, diverse iterazioni nel ciclo senza dipendenza potrebbero iniziare in parallelo, quindi un errore OutOfRange può essere attivato nelle iterazioni successive prima che il calcolo delle iterazioni precedenti termini. Una volta generato un errore OutOfRange, tutte le operazioni nella funzione verranno terminate immediatamente. Se questo è un caso che vorresti evitare, un'alternativa che non genera un errore OutOfRange è tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
restituisce un tf.experimental.Optional
che contiene l'elemento successivo o nessun valore se tf.distribute.DistributedIterator
ha raggiunto la fine.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Utilizzo della proprietà element_spec
Se si passano gli elementi di un set di dati distribuito a una tf.function
e si desidera una garanzia tf.TypeSpec
, è possibile specificare l'argomento input_signature
della tf.function
. L'output di un set di dati distribuito è tf.distribute.DistributedValues
che può rappresentare l'input per un singolo dispositivo o più dispositivi. Per ottenere il tf.TypeSpec
corrispondente a questo valore distribuito è possibile utilizzare la proprietà element_spec
del set di dati distribuito o dell'oggetto iteratore distribuito.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Lotti parziali
I batch parziali vengono rilevati quando le istanze tf.data.Dataset
create dagli utenti possono contenere dimensioni batch che non sono equamente divisibili per il numero di repliche o quando la cardinalità dell'istanza del set di dati non è divisibile per la dimensione del batch. Ciò significa che quando il set di dati viene distribuito su più repliche, la chiamata next
su alcuni iteratori risulterà in un OutOfRangeError. Per gestire questo caso d'uso, tf.distribute
restituisce batch fittizi di dimensione batch 0 sulle repliche che non hanno più dati da elaborare.
Per il caso di singolo lavoratore, se i dati non vengono restituiti dalla chiamata next
sull'iteratore, vengono creati batch fittizi di dimensione batch 0 e utilizzati insieme ai dati reali nel set di dati. Nel caso di batch parziali, l'ultimo batch globale di dati conterrà dati reali insieme a batch di dati fittizi. La condizione di arresto per l'elaborazione dei dati ora controlla se una delle repliche contiene dati. Se non sono presenti dati su nessuna delle repliche, viene generato un errore OutOfRange.
Per il caso multi-worker, il valore booleano che rappresenta la presenza di dati su ciascuno dei worker viene aggregato utilizzando la comunicazione di repliche incrociate e questo viene utilizzato per identificare se tutti i worker hanno terminato l'elaborazione del dataset distribuito. Poiché ciò comporta la comunicazione tra lavoratori, è implicata una penalizzazione delle prestazioni.
Avvertenze
Quando si utilizzano le API
tf.distribute.Strategy.experimental_distribute_dataset
con una configurazione di più operatori, gli utenti passano untf.data.Dataset
che legge dai file. Setf.data.experimental.AutoShardPolicy
è impostato suAUTO
oFILE
, la dimensione batch effettiva per passaggio potrebbe essere inferiore alla dimensione batch globale definita dall'utente. Ciò può verificarsi quando gli elementi rimanenti nel file sono inferiori alla dimensione batch globale. Gli utenti possono esaurire il set di dati senza dipendere dal numero di passaggi da eseguire o impostaretf.data.experimental.AutoShardPolicy
suDATA
per aggirarlo.Le trasformazioni del set di dati con stato non sono attualmente supportate con
tf.distribute
e tutte le operazioni con stato che potrebbe avere il set di dati vengono attualmente ignorate. Ad esempio, se il tuo set di dati ha unmap_fn
che usatf.random.uniform
per ruotare un'immagine, allora hai un grafico del set di dati che dipende dallo stato (cioè il seme casuale) sulla macchina locale in cui viene eseguito il processo python.Le
tf.data.experimental.OptimizationOptions
sperimentali disabilitate per impostazione predefinita possono in determinati contesti, ad esempio se utilizzate insieme atf.distribute
, causare un degrado delle prestazioni. Dovresti abilitarli solo dopo aver verificato che beneficiano delle prestazioni del tuo carico di lavoro in un'impostazione di distribuzione.Fare riferimento a questa guida per come ottimizzare la pipeline di input con
tf.data
in generale. Alcuni suggerimenti aggiuntivi:Se disponi di più worker e stai utilizzando
tf.data.Dataset.list_files
per creare un dataset da tutti i file che corrispondono a uno o più pattern glob, ricorda di impostare l'argomentoseed
o di impostareshuffle=False
in modo che ogni lavoratore partiziona il file in modo coerente.Se la pipeline di input include sia la mescolanza dei dati a livello di record che l'analisi dei dati, a meno che i dati non analizzati siano significativamente più grandi dei dati analizzati (cosa che in genere non è il caso), prima mescola e poi analizza, come mostrato nell'esempio seguente. Ciò può favorire l'utilizzo della memoria e le prestazioni.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
mantiene un buffer interno di elementibuffer_size
, e quindi la riduzionebuffer_size
potrebbe alleviare il problema OOM.L'ordine in cui i dati vengono elaborati dai lavoratori quando si utilizza
tf.distribute.experimental_distribute_dataset
otf.distribute.distribute_datasets_from_function
non è garantito. Questo è in genere necessario se si utilizzatf.distribute
per scalare la previsione. È tuttavia possibile inserire un indice per ogni elemento nel batch e ordinare gli output di conseguenza. Il frammento di codice seguente è un esempio di come ordinare gli output.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Come faccio a distribuire i miei dati se non sto utilizzando un'istanza canonica tf.data.Dataset?
A volte gli utenti non possono utilizzare un tf.data.Dataset
per rappresentare il loro input e successivamente le API sopra menzionate per distribuire il set di dati a più dispositivi. In questi casi è possibile utilizzare tensori grezzi o input da un generatore.
Usa experimental_distribute_values_from_function per input tensoriali arbitrari
strategy.run
accetta tf.distribute.DistributedValues
che è l'output di next(iterator)
. Per passare i valori del tensore, usa experimental_distribute_values_from_function
per costruire tf.distribute.DistributedValues
da tensori grezzi.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Usa tf.data.Dataset.from_generator se il tuo input proviene da un generatore
Se disponi di una funzione generatore che desideri utilizzare, puoi creare un'istanza tf.data.Dataset
utilizzando l'API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.