Ingresso distribuito

Le API tf.distribute forniscono agli utenti un modo semplice per scalare la formazione da una singola macchina a più macchine. Quando si ridimensiona il proprio modello, gli utenti devono anche distribuire il proprio input su più dispositivi. tf.distribute fornisce API che consentono di distribuire automaticamente l'input su tutti i dispositivi.

Questa guida ti mostrerà i diversi modi in cui puoi creare set di dati distribuiti e iteratori utilizzando le API tf.distribute . Inoltre verranno trattati i seguenti argomenti:

Questa guida non copre l'utilizzo dell'input distribuito con le API Keras.

Set di dati distribuiti

Per utilizzare le API tf.distribute per la scalabilità, si consiglia agli utenti di utilizzare per rappresentare il proprio input. tf.distribute è stato progettato per funzionare in modo efficiente con (ad esempio, precaricamento automatico dei dati su ciascun dispositivo acceleratore) con ottimizzazioni delle prestazioni regolarmente incorporate nell'implementazione. Se si dispone di un caso d'uso per l'utilizzo di qualcosa di diverso da , fare riferimento a una sezione successiva in questa guida. In un ciclo di addestramento non distribuito, gli utenti creano prima un'istanza e quindi ripetono gli elementi. Per esempio:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

global_batch_size = 16
# Create a object.
=[1.], [1.])).repeat(100).batch(global_batch_size)

def train_step(inputs):
, labels = inputs
return labels - 0.3 * features

# Iterate over the dataset using the construct.
for inputs in dataset:
Per consentire agli utenti di utilizzare la strategia tf.distribute con modifiche minime al codice esistente di un utente, sono state introdotte due API che distribuirebbero un'istanza e restituirebbero un oggetto dataset distribuito. Un utente può quindi eseguire l'iterazione su questa istanza del set di dati distribuito e addestrare il proprio modello come prima. Esaminiamo ora le due API: tf.distribute.Strategy.experimental_distribute_dataset e tf.distribute.Strategy.distribute_datasets_from_function in modo più dettagliato:



Questa API accetta un'istanza come input e restituisce un'istanza tf.distribute.DistributedDataset . È necessario eseguire il batch del set di dati di input con un valore uguale alla dimensione batch globale. Questa dimensione batch globale è il numero di campioni che desideri elaborare su tutti i dispositivi in ​​1 passaggio. Puoi scorrere questo set di dati distribuito in modo Pythonic o creare un iteratore usando iter . L'oggetto restituito non è un'istanza e non supporta altre API che trasformano o ispezionano il set di dati in alcun modo. Questa è l'API consigliata se non disponi di modi specifici in cui desideri suddividere l'input su repliche diverse.

global_batch_size = 16
= tf.distribute.MirroredStrategy()

=[1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
= mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
tf.distribute ribatta l'istanza di input con una nuova dimensione del batch uguale alla dimensione del batch globale divisa per il numero di repliche sincronizzate. Il numero di repliche sincronizzate è uguale al numero di dispositivi che partecipano al gradiente allriduce durante l'allenamento. Quando un utente effettua la chiamata next sull'iteratore distribuito, viene restituita una dimensione batch di dati per replica su ciascuna replica. La cardinalità del set di dati ribattuto sarà sempre un multiplo del numero di repliche. Qui ci sono un paio di esempi:

  •, drop_remainder=False)

    • Senza distribuzione:
    • Lotto 1: [0, 1, 2, 3]
    • Lotto 2: [4, 5]
    • Con distribuzione su 2 repliche. L'ultimo batch ([4, 5]) è suddiviso tra 2 repliche.

    • Lotto 1:

      • Replica 1:[0, 1]
      • Replica 2:[2, 3]
    • Lotto 2:

      • Replica 2: [4]
      • Replica 2: [5]

    • Senza distribuzione:
    • Lotto 1: [[0], [1], [2], [3]]
    • Con distribuzione su 5 repliche:
    • Lotto 1:
      • Replica 1: [0]
      • Replica 2: [1]
      • Replica 3: [2]
      • Replica 4: [3]
      • Replica 5: []

    • Senza distribuzione:
    • Lotto 1: [0, 1, 2, 3]
    • Lotto 2: [4, 5, 6, 7]
    • Con distribuzione su 3 repliche:
    • Lotto 1:
      • Replica 1: [0, 1]
      • Replica 2: [2, 3]
      • Replica 3: []
    • Lotto 2:
      • Replica 1: [4, 5]
      • Replica 2: [6, 7]
      • Replica 3: []

Il rebatching del set di dati ha una complessità spaziale che aumenta linearmente con il numero di repliche. Ciò significa che per il caso d'uso della formazione per più lavoratori, la pipeline di input può incorrere in errori OOM.


tf.distribute inoltre il partizionamento automatico del set di dati di input nella formazione multi-worker con MultiWorkerMirroredStrategy e TPUStrategy . Ogni set di dati viene creato sul dispositivo CPU del lavoratore. L'autosharding di un set di dati su un set di lavoratori significa che a ciascun lavoratore viene assegnato un sottoinsieme dell'intero set di dati (se è impostato il corretto). Questo per garantire che ad ogni passaggio, una dimensione batch globale di elementi del set di dati non sovrapposti venga elaborata da ciascun lavoratore. Il partizionamento automatico ha un paio di opzioni diverse che possono essere specificate usando . Si noti che non esiste il partizionamento automatico nella formazione multi-worker con ParameterServerStrategy e ulteriori informazioni sulla creazione di set di dati con questa strategia sono disponibili nell'esercitazione relativa alla strategia di Parameter Server .

dataset =[1.],[1.])).repeat(64).batch(16)
.experimental_distribute.auto_shard_policy =
= dataset.with_options(options)

Ci sono tre diverse opzioni che puoi impostare per :

  • AUTO: questa è l'opzione predefinita, il che significa che verrà effettuato un tentativo di shard da FILE. Il tentativo di partizionamento da FILE non riesce se non viene rilevato un set di dati basato su file. tf.distribute tornerà quindi allo sharding da parte di DATA. Si noti che se il set di dati di input è basato su file ma il numero di file è inferiore al numero di worker, verrà generato un InvalidArgumentError . In tal caso, impostare esplicitamente la policy su AutoShardPolicy.DATA o dividere l'origine di input in file più piccoli in modo che il numero di file sia maggiore del numero di worker.
  • FILE: questa è l'opzione se vuoi dividere i file di input su tutti i lavoratori. È necessario utilizzare questa opzione se il numero di file di input è molto maggiore del numero di lavoratori e i dati nei file sono distribuiti uniformemente. Lo svantaggio di questa opzione è avere lavoratori inattivi se i dati nei file non sono distribuiti uniformemente. Se il numero di file è inferiore al numero di worker, verrà generato un InvalidArgumentError . In tal caso, impostare in modo esplicito il criterio su AutoShardPolicy.DATA . Ad esempio, distribuiamo 2 file su 2 lavoratori con 1 replica ciascuno. Il file 1 contiene [0, 1, 2, 3, 4, 5] e il file 2 contiene [6, 7, 8, 9, 10, 11]. Lascia che il numero totale di repliche sincronizzate sia 2 e la dimensione batch globale sia 4.

    • Operaio 0:
    • Lotto 1 = Replica 1: [0, 1]
    • Lotto 2 = Replica 1: [2, 3]
    • Lotto 3 = Replica 1: [4]
    • Lotto 4 = Replica 1: [5]
    • Operaio 1:
    • Lotto 1 = Replica 2: [6, 7]
    • Lotto 2 = Replica 2: [8, 9]
    • Lotto 3 = Replica 2: [10]
    • Lotto 4 = Replica 2: [11]
  • DATI: questo eseguirà la suddivisione automatica degli elementi in tutti i lavoratori. Ciascuno dei lavoratori leggerà l'intero set di dati ed elaborerà solo lo shard assegnato. Tutti gli altri frammenti verranno eliminati. Viene generalmente utilizzato se il numero di file di input è inferiore al numero di lavoratori e si desidera una migliore partizionamento orizzontale dei dati tra tutti i lavoratori. Lo svantaggio è che l'intero set di dati verrà letto su ogni lavoratore. Ad esempio, distribuiamo 1 file su 2 lavoratori. Il file 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Lascia che il numero totale di repliche sincronizzate sia 2.

    • Operaio 0:
    • Lotto 1 = Replica 1: [0, 1]
    • Lotto 2 = Replica 1: [4, 5]
    • Lotto 3 = Replica 1: [8, 9]
    • Operaio 1:
    • Lotto 1 = Replica 2: [2, 3]
    • Lotto 2 = Replica 2: [6, 7]
    • Lotto 3 = Replica 2: [10, 11]
  • OFF: se disattivi il partizionamento automatico, ogni lavoratore elaborerà tutti i dati. Ad esempio, distribuiamo 1 file su 2 lavoratori. Il file 1 contiene [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Lascia che il numero totale di repliche sincronizzate sia 2. Quindi ogni lavoratore vedrà la seguente distribuzione:

    • Operaio 0:
    • Lotto 1 = Replica 1: [0, 1]
    • Lotto 2 = Replica 1: [2, 3]
    • Lotto 3 = Replica 1: [4, 5]
    • Lotto 4 = Replica 1: [6, 7]
    • Lotto 5 = Replica 1: [8, 9]
    • Lotto 6 = Replica 1: [10, 11]

    • Operaio 1:

    • Lotto 1 = Replica 2: [0, 1]

    • Lotto 2 = Replica 2: [2, 3]

    • Lotto 3 = Replica 2: [4, 5]

    • Lotto 4 = Replica 2: [6, 7]

    • Lotto 5 = Replica 2: [8, 9]

    • Lotto 6 = Replica 2: [10, 11]


Per impostazione predefinita, tf.distribute aggiunge una trasformazione di precaricamento alla fine dell'istanza fornita dall'utente. L'argomento della trasformazione di precaricamento che è buffer_size è uguale al numero di repliche sincronizzate.



Questa API accetta una funzione di input e restituisce un'istanza tf.distribute.DistributedDataset . La funzione di input che gli utenti passano ha un argomento tf.distribute.InputContext e dovrebbe restituire un'istanza . Con questa API, tf.distribute non apporta ulteriori modifiche all'istanza dell'utente restituita dalla funzione di input. È responsabilità dell'utente eseguire il batch e lo shard del set di dati. tf.distribute chiama la funzione di input sul dispositivo CPU di ciascuno dei lavoratori. Oltre a consentire agli utenti di specificare la propria logica di batching e sharding, questa API dimostra anche una migliore scalabilità e prestazioni rispetto a tf.distribute.Strategy.experimental_distribute_dataset quando viene utilizzata per la formazione di più lavoratori.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
= input_context.get_per_replica_batch_size(global_batch_size)
= dataset.shard(
.num_input_pipelines, input_context.input_pipeline_id)
= dataset.batch(batch_size)
= dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset

= mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)



L'istanza che è il valore restituito della funzione di input deve essere raggruppata utilizzando la dimensione batch per replica. La dimensione batch per replica è la dimensione batch globale divisa per il numero di repliche che partecipano al training di sincronizzazione. Questo perché tf.distribute chiama la funzione di input sul dispositivo CPU di ciascuno dei lavoratori. Il set di dati creato su un determinato lavoratore dovrebbe essere pronto per l'uso da parte di tutte le repliche su quel lavoratore.


L'oggetto tf.distribute.InputContext che viene passato implicitamente come argomento alla funzione di input dell'utente viene creato da tf.distribute sotto il cofano. Contiene informazioni sul numero di lavoratori, l'ID lavoratore corrente ecc. Questa funzione di input può gestire il partizionamento orizzontale secondo le politiche impostate dall'utente utilizzando queste proprietà che fanno parte dell'oggetto tf.distribute.InputContext .


tf.distribute non aggiunge una trasformazione di precaricamento alla fine del restituito dalla funzione di input fornita dall'utente.

Iteratori distribuiti

Analogamente alle istanze non distribuite, sarà necessario creare un iteratore sulle istanze tf.distribute.DistributedDataset per eseguire l'iterazione su di esso e accedere agli elementi in tf.distribute.DistributedDataset . Di seguito sono riportati i modi in cui puoi creare un tf.distribute.DistributedIterator e usarlo per addestrare il tuo modello:


Usa un costrutto Pythonic for loop

È possibile utilizzare un ciclo Pythonic intuitivo per eseguire l'iterazione su tf.distribute.DistributedDataset . Gli elementi restituiti da tf.distribute.DistributedIterator possono essere un singolo tf.Tensor o un tf.distribute.DistributedValues che contiene un valore per replica. Posizionare il loop all'interno di una tf.function darà un aumento delle prestazioni. Tuttavia, l' break e il return non sono attualmente supportati per un ciclo su un tf.distribute.DistributedDataset inserito all'interno di un tf.function .

global_batch_size = 16
= tf.distribute.MirroredStrategy()

= mirrored_strategy.experimental_distribute_dataset(dataset)

def train_step(inputs):
, labels = inputs
return labels - 0.3 * features

for x in dist_dataset:
# train_step trains the model using the dataset elements
=, args=(x,))
print("Loss is ", loss)
Usa iter per creare un iteratore esplicito

Per scorrere gli elementi in un'istanza tf.distribute.DistributedDataset , puoi creare un tf.distribute.DistributedIterator utilizzando l'API iter su di esso. Con un iteratore esplicito, puoi eseguire l'iterazione per un numero fisso di passaggi. Per ottenere l'elemento successivo da un'istanza tf.distribute.DistributedIterator dist_iterator , puoi chiamare next(dist_iterator) , dist_iterator.get_next() o dist_iterator.get_next_as_optional() . I primi due sono essenzialmente gli stessi:

num_epochs = 10
= 5
for epoch in range(num_epochs):
= iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
=, args=(next(dist_iterator),))
# which is the same as
# loss =, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Con next() o tf.distribute.DistributedIterator.get_next() , se tf.distribute.DistributedIterator ha raggiunto la fine, verrà generato un errore OutOfRange. Il client può rilevare l'errore sul lato Python e continuare a svolgere altri lavori come il checkpoint e la valutazione. Tuttavia, questo non funzionerà se stai utilizzando un ciclo di formazione host (ad esempio, esegui più passaggi per tf.function ), che assomiglia a:

def train_fn(iterator):
for _ in tf.range(steps_per_loop):
.run(step_fn, args=(next(iterator),))

train_fn contiene più passaggi avvolgendo il corpo del passaggio all'interno di un tf.range . In questo caso, diverse iterazioni nel ciclo senza dipendenza potrebbero iniziare in parallelo, quindi un errore OutOfRange può essere attivato nelle iterazioni successive prima che il calcolo delle iterazioni precedenti termini. Una volta generato un errore OutOfRange, tutte le operazioni nella funzione verranno terminate immediatamente. Se questo è un caso che vorresti evitare, un'alternativa che non genera un errore OutOfRange è tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional restituisce un tf.experimental.Optional che contiene l'elemento successivo o nessun valore se tf.distribute.DistributedIterator ha raggiunto la fine.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
= 4
= 5
= tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

= iter(strategy.experimental_distribute_dataset(dataset))

def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
= distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
= x:x, args=(optional_data.get_value(),))
Utilizzo della proprietà element_spec

Se si passano gli elementi di un set di dati distribuito a una tf.function e si desidera una garanzia tf.TypeSpec , è possibile specificare l'argomento input_signature della tf.function . L'output di un set di dati distribuito è tf.distribute.DistributedValues che può rappresentare l'input per un singolo dispositivo o più dispositivi. Per ottenere il tf.TypeSpec corrispondente a questo valore distribuito è possibile utilizzare la proprietà element_spec del set di dati distribuito o dell'oggetto iteratore distribuito.

global_batch_size = 16
= 5
= 5
= tf.distribute.MirroredStrategy()

= mirrored_strategy.experimental_distribute_dataset(dataset)

def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs

return, args=(per_replica_inputs,))

for _ in range(epochs):
= iter(dist_dataset)
for _ in range(steps_per_epoch):
= train_step(next(iterator))
Lotti parziali

I batch parziali vengono rilevati quando le istanze create dagli utenti possono contenere dimensioni batch che non sono equamente divisibili per il numero di repliche o quando la cardinalità dell'istanza del set di dati non è divisibile per la dimensione del batch. Ciò significa che quando il set di dati viene distribuito su più repliche, la chiamata next su alcuni iteratori risulterà in un OutOfRangeError. Per gestire questo caso d'uso, tf.distribute restituisce batch fittizi di dimensione batch 0 sulle repliche che non hanno più dati da elaborare.

Per il caso di singolo lavoratore, se i dati non vengono restituiti dalla chiamata next sull'iteratore, vengono creati batch fittizi di dimensione batch 0 e utilizzati insieme ai dati reali nel set di dati. Nel caso di batch parziali, l'ultimo batch globale di dati conterrà dati reali insieme a batch di dati fittizi. La condizione di arresto per l'elaborazione dei dati ora controlla se una delle repliche contiene dati. Se non sono presenti dati su nessuna delle repliche, viene generato un errore OutOfRange.

Per il caso multi-worker, il valore booleano che rappresenta la presenza di dati su ciascuno dei worker viene aggregato utilizzando la comunicazione di repliche incrociate e questo viene utilizzato per identificare se tutti i worker hanno terminato l'elaborazione del dataset distribuito. Poiché ciò comporta la comunicazione tra lavoratori, è implicata una penalizzazione delle prestazioni.


  • Quando si utilizzano le API tf.distribute.Strategy.experimental_distribute_dataset con una configurazione di più operatori, gli utenti passano un che legge dai file. Se è impostato su AUTO o FILE , la dimensione batch effettiva per passaggio potrebbe essere inferiore alla dimensione batch globale definita dall'utente. Ciò può verificarsi quando gli elementi rimanenti nel file sono inferiori alla dimensione batch globale. Gli utenti possono esaurire il set di dati senza dipendere dal numero di passaggi da eseguire o impostare su DATA per aggirarlo.

  • Le trasformazioni del set di dati con stato non sono attualmente supportate con tf.distribute e tutte le operazioni con stato che potrebbe avere il set di dati vengono attualmente ignorate. Ad esempio, se il tuo set di dati ha un map_fn che usa tf.random.uniform per ruotare un'immagine, allora hai un grafico del set di dati che dipende dallo stato (cioè il seme casuale) sulla macchina locale in cui viene eseguito il processo python.

  • Le sperimentali disabilitate per impostazione predefinita possono in determinati contesti, ad esempio se utilizzate insieme a tf.distribute , causare un degrado delle prestazioni. Dovresti abilitarli solo dopo aver verificato che beneficiano delle prestazioni del tuo carico di lavoro in un'impostazione di distribuzione.

  • Fare riferimento a questa guida per come ottimizzare la pipeline di input con in generale. Alcuni suggerimenti aggiuntivi:

    • Se disponi di più worker e stai utilizzando per creare un dataset da tutti i file che corrispondono a uno o più pattern glob, ricorda di impostare l'argomento seed o di impostare shuffle=False in modo che ogni lavoratore partiziona il file in modo coerente.

    • Se la pipeline di input include sia la mescolanza dei dati a livello di record che l'analisi dei dati, a meno che i dati non analizzati siano significativamente più grandi dei dati analizzati (cosa che in genere non è il caso), prima mescola e poi analizza, come mostrato nell'esempio seguente. Ciò può favorire l'utilizzo della memoria e le prestazioni.

d =, shuffle=False)
= d.shard(num_workers, worker_index)
= d.repeat(num_epochs)
= d.shuffle(shuffle_buffer_size)
= d.interleave(,
=num_readers, block_length=1)
=, num_parallel_calls=num_map_threads)
  •, seed=None, reshuffle_each_iteration=None) mantiene un buffer interno di elementi buffer_size , e quindi la riduzione buffer_size potrebbe alleviare il problema OOM.

  • L'ordine in cui i dati vengono elaborati dai lavoratori quando si utilizza tf.distribute.experimental_distribute_dataset o tf.distribute.distribute_datasets_from_function non è garantito. Questo è in genere necessario se si utilizza tf.distribute per scalare la previsione. È tuttavia possibile inserire un indice per ogni elemento nel batch e ordinare gli output di conseguenza. Il frammento di codice seguente è un esempio di come ordinare gli output.

mirrored_strategy = tf.distribute.MirroredStrategy()
= 24
= 6
= mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
= 2 * inputs
return index, outputs

= {}
for index, inputs in dist_dataset:
, outputs =, args=(index, inputs))
= list(mirrored_strategy.experimental_local_results(output_index))
= []
for a in indices:
= list(mirrored_strategy.experimental_local_results(outputs))
= []
for a in outputs:
for i, value in zip(rindices, routputs):
[i] = value

Come faccio a distribuire i miei dati se non sto utilizzando un'istanza canonica

A volte gli utenti non possono utilizzare un per rappresentare il loro input e successivamente le API sopra menzionate per distribuire il set di dati a più dispositivi. In questi casi è possibile utilizzare tensori grezzi o input da un generatore.

Usa experimental_distribute_values_from_function per input tensoriali arbitrari accetta tf.distribute.DistributedValues che è l'output di next(iterator) . Per passare i valori del tensore, usa experimental_distribute_values_from_function per costruire tf.distribute.DistributedValues da tensori grezzi.

mirrored_strategy = tf.distribute.MirroredStrategy()
= mirrored_strategy.extended.worker_devices

def value_fn(ctx):
return tf.constant(1.0)

= mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
= x:x, args=(distributed_values,))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Usa se il tuo input proviene da un generatore

Se disponi di una funzione generatore che desideri utilizzare, puoi creare un'istanza utilizzando l'API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)

# use Dataset.from_generator
, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
= mirrored_strategy.experimental_distribute_dataset(dataset)
= iter(dist_dataset)
for _ in range(4):
.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.