Zobacz na TensorFlow.org | Uruchom w Google Colab | Wyświetl źródło na GitHub | Pobierz notatnik |
Przed uruchomieniem tego notatnika Colab upewnij się, że akcelerator sprzętowy to TPU, sprawdzając ustawienia notatnika: Środowisko wykonawcze > Zmień typ środowiska wykonawczego > Akcelerator sprzętowy > TPU .
Ustawiać
import tensorflow as tf
import os
import tensorflow_datasets as tfds
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.11) doesn't match a supported version! RequestsDependencyWarning)
Inicjalizacja TPU
TPU to zwykle procesy robocze Cloud TPU, które różnią się od lokalnego procesu, w którym uruchamiany jest program użytkownika w języku Python. Dlatego musisz wykonać pewne prace inicjujące, aby połączyć się ze zdalnym klastrem i zainicjować jednostki TPU. Zauważ, że argument tf.distribute.cluster_resolver.TPUClusterResolver
tpu
specjalny adres tylko dla Colab. Jeśli używasz kodu w Google Compute Engine (GCE), zamiast tego powinieneś przekazać nazwę swojego Cloud TPU.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]
Ręczne umieszczanie urządzenia
Po zainicjowaniu TPU możesz użyć ręcznego umieszczania urządzenia, aby umieścić obliczenia na jednym urządzeniu TPU:
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device: /job:worker/replica:0/task:0/device:TPU:0 tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32)
Strategie dystrybucji
Zwykle uruchamiasz swój model na wielu TPU w sposób równoległy do danych. Aby dystrybuować model na wielu TPU (lub innych akceleratorach), TensorFlow oferuje kilka strategii dystrybucji. Możesz zastąpić swoją strategię dystrybucji, a model będzie działał na dowolnym urządzeniu (TPU). Sprawdź przewodnik po strategii dystrybucji, aby uzyskać więcej informacji.
Aby to zademonstrować, utwórz obiekt tf.distribute.TPUStrategy
:
strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
Aby zreplikować obliczenie, aby mogło działać we wszystkich rdzeniach TPU, możesz przekazać je do interfejsu API strategy.run
. Poniżej znajduje się przykład, który pokazuje wszystkie rdzenie otrzymujące te same dane wejściowe (a, b)
i wykonujące niezależnie mnożenie macierzy na każdym rdzeniu. Dane wyjściowe będą wartościami ze wszystkich replik.
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{ 0: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 1: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 2: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 3: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 4: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 5: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 6: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 7: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32) }
Klasyfikacja na TPU
Po omówieniu podstawowych pojęć rozważ bardziej konkretny przykład. W tej sekcji pokazano, jak używać strategii dystrybucji — tf.distribute.TPUStrategy
— do trenowania modelu Keras w Cloud TPU.
Zdefiniuj model Kerasa
Zacznij od definicji modelu Sequential
Keras do klasyfikacji obrazów w zestawie danych MNIST przy użyciu Keras. Nie różni się to od tego, czego użyłbyś, gdybyś trenował na procesorach lub kartach graficznych. Pamiętaj, że tworzenie modelu Keras musi znajdować się wewnątrz strategy.scope
, więc zmienne mogą być tworzone na każdym urządzeniu TPU. Inne części kodu nie muszą znajdować się w zakresie strategii.
def create_model():
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(256, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)])
Załaduj zbiór danych
Wydajne korzystanie z interfejsu API tf.data.Dataset
ma kluczowe znaczenie podczas korzystania z Cloud TPU, ponieważ nie można korzystać z Cloud TPU, chyba że można je odpowiednio szybko podać danymi. Więcej informacji na temat wydajności zestawu danych można znaleźć w przewodniku po wydajności potoku danych wejściowych .
W przypadku wszystkich, oprócz najprostszych eksperymentów (z wykorzystaniem tf.data.Dataset.from_tensor_slices
lub innych danych w wykresie), wszystkie pliki danych odczytywane przez zbiór danych należy przechowywać w zasobnikach Google Cloud Storage (GCS).
W większości przypadków zaleca się przekonwertowanie danych do formatu TFRecord
i użycie tf.data.TFRecordDataset
do ich odczytania. Sprawdź samouczek TFRecord i tf.Example, aby dowiedzieć się, jak to zrobić. Nie jest to trudne wymaganie i można używać innych czytników zestawów danych, takich jak tf.data.FixedLengthRecordDataset
lub tf.data.TextLineDataset
.
Możesz załadować całe małe zestawy danych do pamięci za pomocą tf.data.Dataset.cache
.
Niezależnie od używanego formatu danych, zdecydowanie zaleca się używanie dużych plików, rzędu 100 MB. Jest to szczególnie ważne w tym środowisku sieciowym, ponieważ obciążenie związane z otwarciem pliku jest znacznie wyższe.
Jak pokazano w poniższym kodzie, należy użyć modułu tensorflow_datasets
, aby uzyskać kopię danych treningowych i testowych MNIST. Pamiętaj, że try_gcs
jest określony tak, aby używać kopii, która jest dostępna w publicznym zasobniku GCS. Jeśli tego nie określisz, TPU nie będzie mieć dostępu do pobranych danych.
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
# Normalize the input data.
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage of having an
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so that you don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
Trenuj model za pomocą API wysokiego poziomu Keras
Możesz trenować swój model za pomocą interfejsów API Keras fit
i compile
. W tym kroku nie ma nic specyficznego dla TPU — piszesz kod tak, jakbyś używał wielu procesorów graficznych i MirroredStrategy
zamiast TPUStrategy
. Więcej informacji znajdziesz w samouczku Szkolenie rozproszone z Keras .
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 18s 32ms/step - loss: 0.1433 - sparse_categorical_accuracy: 0.9564 - val_loss: 0.0452 - val_sparse_categorical_accuracy: 0.9859 Epoch 2/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0335 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0318 - val_sparse_categorical_accuracy: 0.9899 Epoch 3/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0199 - sparse_categorical_accuracy: 0.9935 - val_loss: 0.0397 - val_sparse_categorical_accuracy: 0.9866 Epoch 4/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0109 - sparse_categorical_accuracy: 0.9964 - val_loss: 0.0436 - val_sparse_categorical_accuracy: 0.9892 Epoch 5/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0103 - sparse_categorical_accuracy: 0.9963 - val_loss: 0.0481 - val_sparse_categorical_accuracy: 0.9881 <keras.callbacks.History at 0x7f0d485602e8>
Aby zmniejszyć obciążenie Pythona i zmaksymalizować wydajność TPU, przekaż argument — steps_per_execution
— do Model.compile
. W tym przykładzie zwiększa przepustowość o około 50%:
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 12s 41ms/step - loss: 0.1515 - sparse_categorical_accuracy: 0.9537 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9863 Epoch 2/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0366 - sparse_categorical_accuracy: 0.9891 - val_loss: 0.0410 - val_sparse_categorical_accuracy: 0.9875 Epoch 3/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0191 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0432 - val_sparse_categorical_accuracy: 0.9865 Epoch 4/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0447 - val_sparse_categorical_accuracy: 0.9875 Epoch 5/5 300/300 [==============================] - 3s 11ms/step - loss: 0.0093 - sparse_categorical_accuracy: 0.9968 - val_loss: 0.0426 - val_sparse_categorical_accuracy: 0.9884 <keras.callbacks.History at 0x7f0d0463cd68>
Trenuj model, korzystając z niestandardowej pętli szkoleniowej
Możesz również tworzyć i trenować swój model bezpośrednio przy użyciu interfejsów API tf.function
i tf.distribute
. Możesz użyć funkcji API strategy.experimental_distribute_datasets_from_function
do dystrybucji zestawu danych przy użyciu funkcji zestawu danych. Należy zauważyć, że w poniższym przykładzie rozmiar partii przekazany do zestawu danych to rozmiar partii na replikę, a nie globalny rozmiar partii. Aby dowiedzieć się więcej, zapoznaj się ze szkoleniem niestandardowym z samouczkiem tf.distribute.Strategy .
Najpierw utwórz model, zbiory danych i funkcje tf.functions:
# Create the model, optimizer and metrics inside the strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function
Następnie uruchom pętlę treningową:
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
Epoch: 0/5 Current step: 300, training loss: 0.1339, accuracy: 95.79% Epoch: 1/5 Current step: 600, training loss: 0.0333, accuracy: 98.91% Epoch: 2/5 Current step: 900, training loss: 0.0176, accuracy: 99.43% Epoch: 3/5 Current step: 1200, training loss: 0.0126, accuracy: 99.61% Epoch: 4/5 Current step: 1500, training loss: 0.0122, accuracy: 99.61%
Poprawa wydajności dzięki wielu krokom wewnątrz tf.function
Możesz poprawić wydajność, wykonując wiele kroków w ramach tf.function
. Osiąga się to poprzez zawinięcie wywołania strategy.run
do tf.range
wewnątrz tf.function
, a AutoGraph przekonwertuje je na tf.while_loop
w procesie roboczym TPU.
Pomimo zwiększonej wydajności, ta metoda ma pewne kompromisy w porównaniu z uruchomieniem pojedynczego kroku wewnątrz tf.function
. Uruchamianie wielu kroków w funkcji tf.function
jest mniej elastyczne — nie można uruchamiać rzeczy pochopnie ani arbitralnego kodu Pythona w obrębie tych kroków.
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0081, accuracy: 99.74%
Następne kroki
- Dokumentacja Google Cloud TPU : jak skonfigurować i uruchomić Google Cloud TPU.
- Notatniki Google Cloud TPU Colab : kompleksowe przykłady szkoleń.
- Przewodnik po wydajności Google Cloud TPU : jeszcze bardziej zwiększ wydajność Cloud TPU, dostosowując parametry konfiguracyjne Cloud TPU do swojej aplikacji
- Szkolenie rozproszone z TensorFlow : Jak korzystać ze strategii dystrybucji — w tym
tf.distribute.TPUStrategy
— z przykładami pokazującymi najlepsze praktyki.