Visualizza su TensorFlow.org | Esegui in Google Colab | Visualizza l'origine su GitHub | Scarica quaderno |
Prima di eseguire questo notebook Colab, assicurati che l'acceleratore hardware sia una TPU controllando le impostazioni del notebook: Runtime > Modifica tipo di runtime > Acceleratore hardware > TPU .
Impostare
import tensorflow as tf
import os
import tensorflow_datasets as tfds
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.11) doesn't match a supported version! RequestsDependencyWarning)
Inizializzazione TPU
Le TPU sono in genere lavoratori Cloud TPU, che sono diversi dal processo locale che esegue il programma Python dell'utente. Pertanto, è necessario eseguire alcuni lavori di inizializzazione per connettersi al cluster remoto e inizializzare le TPU. Si noti che l'argomento tpu
di tf.distribute.cluster_resolver.TPUClusterResolver
è un indirizzo speciale solo per Colab. Se stai eseguendo il tuo codice su Google Compute Engine (GCE), dovresti invece passare il nome del tuo Cloud TPU.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]
Posizionamento manuale del dispositivo
Dopo aver inizializzato la TPU, puoi utilizzare il posizionamento manuale del dispositivo per posizionare il calcolo su un singolo dispositivo TPU:
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device: /job:worker/replica:0/task:0/device:TPU:0 tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32)
Strategie distributive
Di solito esegui il tuo modello su più TPU in modo parallelo ai dati. Per distribuire il tuo modello su più TPU (o altri acceleratori), TensorFlow offre diverse strategie di distribuzione. Puoi sostituire la tua strategia di distribuzione e il modello verrà eseguito su un determinato dispositivo (TPU). Consulta la guida alla strategia di distribuzione per ulteriori informazioni.
Per dimostrarlo, crea un oggetto tf.distribute.TPUStrategy
:
strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
Per replicare un calcolo in modo che possa essere eseguito in tutti i core TPU, puoi passarlo all'API strategy.run
. Di seguito è riportato un esempio che mostra tutti i core che ricevono gli stessi input (a, b)
ed eseguono la moltiplicazione di matrici su ciascun core in modo indipendente. Gli output saranno i valori di tutte le repliche.
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{ 0: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 1: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 2: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 3: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 4: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 5: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 6: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 7: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32) }
Classificazione sui TPU
Dopo aver coperto i concetti di base, consideriamo un esempio più concreto. Questa sezione mostra come utilizzare la strategia di distribuzione, tf.distribute.TPUStrategy
, per addestrare un modello Keras su una Cloud TPU.
Definire un modello Keras
Inizia con una definizione di un modello Sequential
Keras per la classificazione delle immagini sul set di dati MNIST utilizzando Keras. Non è diverso da quello che useresti se ti alleni su CPU o GPU. Tieni presente che la creazione del modello Keras deve essere all'interno di strategy.scope
, quindi le variabili possono essere create su ciascun dispositivo TPU. Non è necessario che altre parti del codice rientrino nell'ambito della strategia.
def create_model():
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(256, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)])
Carica il set di dati
L'uso efficiente dell'API tf.data.Dataset
è fondamentale quando si utilizza una Cloud TPU, poiché è impossibile utilizzare le Cloud TPU a meno che non sia possibile fornire loro i dati abbastanza rapidamente. È possibile ottenere ulteriori informazioni sulle prestazioni del set di dati nella Guida alle prestazioni della pipeline di input .
Per tutti gli esperimenti tranne quelli più semplici (utilizzando tf.data.Dataset.from_tensor_slices
o altri dati in-graph), devi archiviare tutti i file di dati letti dal set di dati nei bucket di Google Cloud Storage (GCS).
Per la maggior parte dei casi d'uso, si consiglia di convertire i dati nel formato TFRecord
e di utilizzare un tf.data.TFRecordDataset
per leggerli. Controlla il tutorial TFRecord e tf.Example per i dettagli su come farlo. Non è un requisito difficile ed è possibile utilizzare altri lettori di set di dati, come tf.data.FixedLengthRecordDataset
o tf.data.TextLineDataset
.
Puoi caricare interi set di dati di piccole dimensioni in memoria utilizzando tf.data.Dataset.cache
.
Indipendentemente dal formato dei dati utilizzato, si consiglia vivamente di utilizzare file di grandi dimensioni dell'ordine di 100 MB. Ciò è particolarmente importante in questa impostazione di rete, poiché il sovraccarico dell'apertura di un file è notevolmente più elevato.
Come mostrato nel codice seguente, dovresti usare il modulo tensorflow_datasets
per ottenere una copia dei dati di addestramento e test MNIST. Tieni presente che try_gcs
è specificato per utilizzare una copia disponibile in un bucket GCS pubblico. Se non lo specifichi, la TPU non sarà in grado di accedere ai dati scaricati.
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
# Normalize the input data.
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage of having an
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so that you don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
Addestra il modello utilizzando le API di alto livello di Keras
Puoi addestrare il tuo modello con Keras fit
e compile
API. Non c'è nulla di specifico per TPU in questo passaggio: scrivi il codice come se stessi usando più GPU e una MirroredStrategy
invece di TPUStrategy
. Puoi saperne di più nel tutorial Distributed training with Keras .
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 18s 32ms/step - loss: 0.1433 - sparse_categorical_accuracy: 0.9564 - val_loss: 0.0452 - val_sparse_categorical_accuracy: 0.9859 Epoch 2/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0335 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0318 - val_sparse_categorical_accuracy: 0.9899 Epoch 3/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0199 - sparse_categorical_accuracy: 0.9935 - val_loss: 0.0397 - val_sparse_categorical_accuracy: 0.9866 Epoch 4/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0109 - sparse_categorical_accuracy: 0.9964 - val_loss: 0.0436 - val_sparse_categorical_accuracy: 0.9892 Epoch 5/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0103 - sparse_categorical_accuracy: 0.9963 - val_loss: 0.0481 - val_sparse_categorical_accuracy: 0.9881 <keras.callbacks.History at 0x7f0d485602e8>
Per ridurre l'overhead di Python e massimizzare le prestazioni della tua TPU, passa l'argomento steps_per_execution
Model.compile
. In questo esempio, aumenta il throughput di circa il 50%:
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 12s 41ms/step - loss: 0.1515 - sparse_categorical_accuracy: 0.9537 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9863 Epoch 2/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0366 - sparse_categorical_accuracy: 0.9891 - val_loss: 0.0410 - val_sparse_categorical_accuracy: 0.9875 Epoch 3/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0191 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0432 - val_sparse_categorical_accuracy: 0.9865 Epoch 4/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0447 - val_sparse_categorical_accuracy: 0.9875 Epoch 5/5 300/300 [==============================] - 3s 11ms/step - loss: 0.0093 - sparse_categorical_accuracy: 0.9968 - val_loss: 0.0426 - val_sparse_categorical_accuracy: 0.9884 <keras.callbacks.History at 0x7f0d0463cd68>
Addestra il modello utilizzando un ciclo di addestramento personalizzato
Puoi anche creare e addestrare il tuo modello utilizzando direttamente le API tf.function
e tf.distribute
. È possibile utilizzare l'API strategy.experimental_distribute_datasets_from_function
per distribuire il set di dati a una funzione del set di dati. Si noti che nell'esempio seguente la dimensione batch passata nel set di dati è la dimensione batch per replica anziché la dimensione batch globale. Per saperne di più, consulta il tutorial Formazione personalizzata con tf.distribute.Strategy .
Innanzitutto, crea il modello, i set di dati e le funzioni tf:
# Create the model, optimizer and metrics inside the strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function
Quindi, esegui il ciclo di formazione:
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
Epoch: 0/5 Current step: 300, training loss: 0.1339, accuracy: 95.79% Epoch: 1/5 Current step: 600, training loss: 0.0333, accuracy: 98.91% Epoch: 2/5 Current step: 900, training loss: 0.0176, accuracy: 99.43% Epoch: 3/5 Current step: 1200, training loss: 0.0126, accuracy: 99.61% Epoch: 4/5 Current step: 1500, training loss: 0.0122, accuracy: 99.61%
Miglioramento delle prestazioni con più passaggi all'interno tf.function
È possibile migliorare le prestazioni eseguendo più passaggi all'interno di una tf.function
. Ciò si ottiene avvolgendo la chiamata strategy.run
con un tf.range
all'interno tf.function
e AutoGraph lo convertirà in un tf.while_loop
sul ruolo di lavoro TPU.
Nonostante le prestazioni migliorate, questo metodo presenta dei compromessi rispetto all'esecuzione di un singolo passaggio all'interno tf.function
. L'esecuzione di più passaggi in una tf.function
è meno flessibile: non è possibile eseguire cose avidamente o codice Python arbitrario all'interno dei passaggi.
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0081, accuracy: 99.74%
Prossimi passi
- Documentazione di Google Cloud TPU : come configurare ed eseguire un Google Cloud TPU.
- Notebook Google Cloud TPU Colab : esempi di formazione end-to-end.
- Guida alle prestazioni di Google Cloud TPU : migliora ulteriormente le prestazioni di Cloud TPU regolando i parametri di configurazione di Cloud TPU per la tua applicazione
- Formazione distribuita con TensorFlow : come utilizzare le strategie di distribuzione, incluso
tf.distribute.TPUStrategy
, con esempi che mostrano le migliori pratiche.