Посмотреть на TensorFlow.org | Запустить в Google Colab | Посмотреть исходный код на GitHub | Скачать блокнот |
Прежде чем запускать этот блокнот Colab, убедитесь, что ваш аппаратный ускоритель является TPU, проверив настройки ноутбука: Время выполнения > Изменить тип среды выполнения > Аппаратный ускоритель > TPU .
Настраивать
import tensorflow as tf
import os
import tensorflow_datasets as tfds
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.11) doesn't match a supported version! RequestsDependencyWarning)
Инициализация ТПУ
TPU обычно являются рабочими Cloud TPU, которые отличаются от локального процесса, выполняющего пользовательскую программу Python. Таким образом, вам нужно выполнить некоторую работу по инициализации, чтобы подключиться к удаленному кластеру и инициализировать TPU. Обратите внимание, что аргумент tpu
для tf.distribute.cluster_resolver.TPUClusterResolver
— это специальный адрес только для Colab. Если вы запускаете свой код в Google Compute Engine (GCE), вам следует вместо этого передать имя вашего облачного TPU.
resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Clearing out eager caches INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470 INFO:tensorflow:Finished initializing TPU system. INFO:tensorflow:Finished initializing TPU system. All devices: [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]
Размещение устройства вручную
После инициализации TPU вы можете использовать ручное размещение устройства, чтобы разместить вычисления на одном устройстве TPU:
a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
with tf.device('/TPU:0'):
c = tf.matmul(a, b)
print("c device: ", c.device)
print(c)
c device: /job:worker/replica:0/task:0/device:TPU:0 tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32)
Стратегии распространения
Обычно вы запускаете свою модель на нескольких TPU параллельно данным. Для распространения вашей модели на несколько TPU (или других ускорителей) TensorFlow предлагает несколько стратегий распространения. Вы можете заменить свою стратегию распространения, и модель будет работать на любом устройстве (TPU). Дополнительные сведения см. в руководстве по стратегии распространения .
Чтобы продемонстрировать это, создайте объект tf.distribute.TPUStrategy
:
strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system: INFO:tensorflow:Found TPU system: INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Cores: 8 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Workers: 1 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Num TPU Cores Per Worker: 8 INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0) INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
Чтобы воспроизвести вычисление, чтобы оно могло выполняться во всех ядрах TPU, вы можете передать его в API strategy.run
. Ниже приведен пример, который показывает, что все ядра получают одни и те же входные данные (a, b)
и выполняют умножение матриц на каждом ядре независимо. Выходными данными будут значения из всех реплик.
@tf.function
def matmul_fn(x, y):
z = tf.matmul(x, y)
return z
z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{ 0: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 1: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 2: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 3: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 4: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 5: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 6: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32), 7: tf.Tensor( [[22. 28.] [49. 64.]], shape=(2, 2), dtype=float32) }
Классификация ТПУ
Раскрыв основные понятия, рассмотрим более конкретный пример. В этом разделе показано, как использовать стратегию распространения — tf.distribute.TPUStrategy
— для обучения модели Keras на облачном TPU.
Определить модель Keras
Начните с определения модели Sequential
Keras для классификации изображений в наборе данных MNIST с использованием Keras. Это ничем не отличается от того, что вы использовали бы, если бы тренировались на процессорах или графических процессорах. Обратите внимание, что создание модели Keras должно происходить внутри strategy.scope
, чтобы переменные можно было создавать на каждом устройстве TPU. Другие части кода не обязательно должны находиться в рамках стратегии.
def create_model():
return tf.keras.Sequential(
[tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(256, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)])
Загрузите набор данных
Эффективное использование API tf.data.Dataset
имеет решающее значение при использовании облачного TPU, поскольку невозможно использовать облачные TPU, если вы не можете достаточно быстро передавать им данные. Вы можете узнать больше о производительности набора данных в руководстве по производительности входного конвейера .
Для всех экспериментов, кроме самых простых (с использованием tf.data.Dataset.from_tensor_slices
или других данных в графе), вам необходимо хранить все файлы данных, считанные набором данных, в корзинах Google Cloud Storage (GCS).
В большинстве случаев рекомендуется преобразовать ваши данные в формат TFRecord
и использовать tf.data.TFRecordDataset
для их чтения. Ознакомьтесь с руководством по TFRecord и tf.Example, чтобы узнать, как это сделать. Это не является жестким требованием, и вы можете использовать другие средства чтения наборов данных, такие как tf.data.FixedLengthRecordDataset
или tf.data.TextLineDataset
.
Вы можете загружать целые небольшие наборы данных в память, используя tf.data.Dataset.cache
.
Независимо от используемого формата данных настоятельно рекомендуется использовать большие файлы порядка 100 МБ. Это особенно важно при работе в сети, так как затраты на открытие файла значительно выше.
Как показано в приведенном ниже коде, вы должны использовать модуль tensorflow_datasets
, чтобы получить копию обучающих и тестовых данных MNIST. Обратите внимание, что в try_gcs
используется копия, доступная в общедоступной корзине GCS. Если вы не укажете это, TPU не сможет получить доступ к загруженным данным.
def get_dataset(batch_size, is_training=True):
split = 'train' if is_training else 'test'
dataset, info = tfds.load(name='mnist', split=split, with_info=True,
as_supervised=True, try_gcs=True)
# Normalize the input data.
def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255.0
return image, label
dataset = dataset.map(scale)
# Only shuffle and repeat the dataset in training. The advantage of having an
# infinite dataset for training is to avoid the potential last partial batch
# in each epoch, so that you don't need to think about scaling the gradients
# based on the actual batch size.
if is_training:
dataset = dataset.shuffle(10000)
dataset = dataset.repeat()
dataset = dataset.batch(batch_size)
return dataset
Обучите модель с помощью высокоуровневых API-интерфейсов Keras.
Вы можете обучить свою модель с помощью Keras fit
и compile
API. На этом шаге нет ничего специфичного для TPU — вы пишете код так, как если бы использовали несколько графических процессоров и MirroredStrategy
вместо TPUStrategy
. Вы можете узнать больше в учебном пособии по распределенному обучению с помощью Keras .
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size
train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 18s 32ms/step - loss: 0.1433 - sparse_categorical_accuracy: 0.9564 - val_loss: 0.0452 - val_sparse_categorical_accuracy: 0.9859 Epoch 2/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0335 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0318 - val_sparse_categorical_accuracy: 0.9899 Epoch 3/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0199 - sparse_categorical_accuracy: 0.9935 - val_loss: 0.0397 - val_sparse_categorical_accuracy: 0.9866 Epoch 4/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0109 - sparse_categorical_accuracy: 0.9964 - val_loss: 0.0436 - val_sparse_categorical_accuracy: 0.9892 Epoch 5/5 300/300 [==============================] - 6s 21ms/step - loss: 0.0103 - sparse_categorical_accuracy: 0.9963 - val_loss: 0.0481 - val_sparse_categorical_accuracy: 0.9881 <keras.callbacks.History at 0x7f0d485602e8>
Чтобы уменьшить накладные расходы Python и максимизировать производительность вашего TPU, передайте аргумент — steps_per_execution
— Model.compile
. В этом примере это увеличивает пропускную способность примерно на 50%:
with strategy.scope():
model = create_model()
model.compile(optimizer='adam',
# Anything between 2 and `steps_per_epoch` could help here.
steps_per_execution = 50,
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['sparse_categorical_accuracy'])
model.fit(train_dataset,
epochs=5,
steps_per_epoch=steps_per_epoch,
validation_data=test_dataset,
validation_steps=validation_steps)
Epoch 1/5 300/300 [==============================] - 12s 41ms/step - loss: 0.1515 - sparse_categorical_accuracy: 0.9537 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9863 Epoch 2/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0366 - sparse_categorical_accuracy: 0.9891 - val_loss: 0.0410 - val_sparse_categorical_accuracy: 0.9875 Epoch 3/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0191 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0432 - val_sparse_categorical_accuracy: 0.9865 Epoch 4/5 300/300 [==============================] - 3s 10ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0447 - val_sparse_categorical_accuracy: 0.9875 Epoch 5/5 300/300 [==============================] - 3s 11ms/step - loss: 0.0093 - sparse_categorical_accuracy: 0.9968 - val_loss: 0.0426 - val_sparse_categorical_accuracy: 0.9884 <keras.callbacks.History at 0x7f0d0463cd68>
Обучите модель с помощью пользовательского цикла обучения
Вы также можете создавать и обучать свою модель напрямую с помощью API tf.function
и tf.distribute
. Вы можете использовать API-интерфейс strategy.experimental_distribute_datasets_from_function
для распространения набора данных с помощью функции набора данных. Обратите внимание, что в приведенном ниже примере размер пакета, переданный в набор данных, представляет собой размер пакета для каждой реплики, а не глобальный размер пакета. Чтобы узнать больше, ознакомьтесь с учебником Custom training with tf.distribute.Strategy .
Сначала создайте модель, наборы данных и tf.functions:
# Create the model, optimizer and metrics inside the strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync
train_dataset = strategy.experimental_distribute_datasets_from_function(
lambda _: get_dataset(per_replica_batch_size, is_training=True))
@tf.function
def train_step(iterator):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version. Instructions for updating: rename to distribute_datasets_from_function
Затем запустите обучающий цикл:
steps_per_eval = 10000 // batch_size
train_iterator = iter(train_dataset)
for epoch in range(5):
print('Epoch: {}/5'.format(epoch))
for step in range(steps_per_epoch):
train_step(train_iterator)
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
training_loss.reset_states()
training_accuracy.reset_states()
Epoch: 0/5 Current step: 300, training loss: 0.1339, accuracy: 95.79% Epoch: 1/5 Current step: 600, training loss: 0.0333, accuracy: 98.91% Epoch: 2/5 Current step: 900, training loss: 0.0176, accuracy: 99.43% Epoch: 3/5 Current step: 1200, training loss: 0.0126, accuracy: 99.61% Epoch: 4/5 Current step: 1500, training loss: 0.0122, accuracy: 99.61%
Повышение производительности с помощью нескольких шагов внутри tf.function
Вы можете улучшить производительность, выполнив несколько шагов в tf.function
. Это достигается путем включения в вызов tf.range
strategy.run
tf.function
, и AutoGraph преобразует его в tf.while_loop
на рабочем TPU.
Несмотря на улучшенную производительность, у этого метода есть компромиссы по сравнению с выполнением одного шага внутри tf.function
. Запуск нескольких шагов в tf.function
менее гибок — вы не можете запускать какие-то вещи или произвольный код Python внутри шагов.
@tf.function
def train_multiple_steps(iterator, steps):
"""The step function for one training step."""
def step_fn(inputs):
"""The computation to run on each TPU device."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits, from_logits=True)
loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
training_loss.update_state(loss * strategy.num_replicas_in_sync)
training_accuracy.update_state(labels, logits)
for _ in tf.range(steps):
strategy.run(step_fn, args=(next(iterator),))
# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))
print('Current step: {}, training loss: {}, accuracy: {}%'.format(
optimizer.iterations.numpy(),
round(float(training_loss.result()), 4),
round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0081, accuracy: 99.74%
Следующие шаги
- Документация Google Cloud TPU : как настроить и запустить Google Cloud TPU.
- Блокноты Google Cloud TPU Colab : примеры сквозного обучения.
- Руководство по производительности Google Cloud TPU : улучшите производительность Cloud TPU, настроив параметры конфигурации Cloud TPU для своего приложения.
- Распределенное обучение с TensorFlow : как использовать стратегии распространения, включая
tf.distribute.TPUStrategy
с примерами, демонстрирующими лучшие практики.