이 튜토리얼에서는 tf.distribute.Strategy
TensorFlow API와 사용자 정의 훈련 루프를 사용하여 여러 처리 장치(GPU, 여러 머신 또는 TPU)에 훈련을 배포하기 위한 추상화 방법을 보여줍니다. 이 예에서는 28 x 28 크기의 이미지 70,000개가 포함된 Fashion MNIST 데이터세트에서 간단한 컨볼루션 신경망을 훈련합니다.
사용자 정의 훈련 루프는 훈련에 대한 유연성과 더 나은 통제력을 제공합니다. 또한 모델과 훈련 루프를 쉽게 디버그할 수 있습니다.
# Import TensorFlow
import tensorflow as tf
# Helper libraries
import numpy as np
import os
Fashion MNIST 데이터세트 다운로드하기
fashion_mnist = tf.keras.datasets.fashion_mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
# Add a dimension to the array -> new shape == (28, 28, 1)
# This is done because the first layer in our model is a convolutional
# layer and it requires a 4D input (batch_size, height, width, channels).
# batch_size dimension will be added later on.
train_images = train_images[..., None]
test_images = test_images[..., None]
# Scale the images to the [0, 1] range.
train_images = train_images / np.float32(255)
test_images = test_images / np.float32(255)
변수와 그래프를 분산하는 전략 만들기
전략이 어떻게 동작할까요?
- 모든 변수와 모델 그래프는 복제본 간에 복제됩니다.
- 입력은 장치에 고르게 분배되어 들어갑니다.
- 각 장치는 주어지는 입력에 대해서 손실(loss)과 그래디언트를 계산합니다.
- 그래디언트들을 전부 더함으로써 모든 장치들 간에 그래디언트들이 동기화됩니다.
- 동기화된 후에, 동일한 업데이트가 각 장치에 있는 변수의 복사본(copies)에 동일하게 적용됩니다.
참고: 아래의 모든 코드를 단일 범위에 넣을 수 있습니다. 이 예에서는 설명을 위해 여러 코드 셀로 나눕니다.
# If the list of devices is not specified in
# `tf.distribute.MirroredStrategy` constructor, they will be auto-detected.
strategy = tf.distribute.MirroredStrategy()
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:GPU:1', '/job:localhost/replica:0/task:0/device:GPU:2', '/job:localhost/replica:0/task:0/device:GPU:3')
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
Number of devices: 4
입력 파이프라인 설정하기
BUFFER_SIZE = len(train_images)
GLOBAL_BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync
데이터세트를 만들고 배포합니다.
train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(BUFFER_SIZE).batch(GLOBAL_BATCH_SIZE)
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)
모델 만들기
을 사용하여 모델을 만듭니다. 모델 하위 클래스화 API 또는 함수형 API를 사용하여 이를 수행할 수도 있습니다.
def create_model():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Conv2D(64, 3, activation='relu'),
tf.keras.layers.Dense(64, activation='relu'),
return model
# Create a checkpoint directory to store the checkpoints.
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
손실 함수 정의하기
일반적으로 단일 GPU/CPU가 있는 단일 시스템에서 손실 함수는 입력 배치의 예제 수로 나뉩니다.
그렇다면 tf.distribute.Strategy
를 사용할 때 손실을 어떻게 계산해야 할까요?
예를 들어 GPU가 4개 있고 배치 크기가 64라고 가정해 보겠습니다. 하나의 입력 배치가 전체 복제본(4개의 GPU)에 걸쳐 분배되고 각 복제본은 크기 16의 입력을 받습니다.
각 복제본의 모델은 해당 입력으로 순방향 전달을 수행하고 손실을 계산합니다. 이제 손실을 해당 입력의 예제 수로 나누는 대신(BATCH_SIZE_PER_REPLICA = 16), 손실을 GLOBAL_BATCH_SIZE(64)로 나누어야 합니다.
이렇게 하는 이유는 무엇일까요?
- 각 복제본에서 그래디언트가 계산된 후 이를 합산하여 전체 복제본에 걸쳐 동기화되기 때문에 이렇게 해야 합니다.
TensorFlow에서 이 작업을 어떻게 수행할까요?
이 튜토리얼에서와 같이 사용자 지정 훈련 루프를 작성하는 경우 예제당 손실을 합산하고 합계를 GLOBAL_BATCH_SIZE로 나누어야 합니다:
scale_loss = tf.reduce_sum(loss) * (1. / GLOBAL_BATCH_SIZE)
를 사용하여 예제당 손실, 선택적 샘플 가중치, GLOBAL_BATCH_SIZE를 인수로 사용하고 조정된 손실을 반환할 수 있습니다.모델에서 정규화 손실을 사용하는 경우 복제본 수에 따라 손실 값을 확장해야 합니다.
함수를 사용하여 이를 수행할 수 있습니다.tf.reduce_mean
을 사용하는 것은 권장하지 않습니다. 이렇게 하면 손실이 실제 복제본 배치 크기별로 나눠지는데, 이는 단계별로 다를 수 있습니다.이 축소 및 크기 조정은 keras
에서 자동으로 수행됩니다.tf.keras.losses
클래스를 사용하는 경우(아래 예와 같이) 손실 축소는NONE
중 하나로 명시적으로 지정되어야 합니다.AUTO
와 함께 사용할 때 허용되지 않습니다.AUTO
는 사용자가 어떤 축소가 필요한지 명시적으로 생각해야 하므로(분산된 경우에 이러한 축소가 올바른지 확인하기 위해) 허용되지 않습니다.SUM_OVER_BATCH_SIZE
는 현재 복제본 배치 크기별로만 나누고 복제본 수로 나누는 일은 사용자에게 맡기는 데, 이를 쉽게 놓칠 수 있다는 점 때문에 허용되지 않습니다. 따라서 사용자가 직접 명시적으로 축소를 수행해야 합니다.labels
이 다차원인 경우 각 샘플의 요소 수에 걸쳐per_example_loss
의 평균을 구합니다. 예를 들어predictions
의 형상이(batch_size, H, W, n_classes)
이(batch_size, H, W)
인 경우, 다음과 같이per_example_loss
를 업데이트해야 합니다:per_example_loss /= tf.cast(tf.reduce_prod(tf.shape(labels)[1:]), tf.float32)
주의: 손실의 형상을 확인하세요.
의 손실 함수는 일반적으로 입력의 마지막 차원에 대한 평균을 반환합니다. 손실 클래스는 이러한 함수를 래핑합니다. 손실 클래스의 인스턴스를 생성할 때reduction=Reduction.NONE
을 전달하는 것은 "추가적인 축소가 없음"을 의미합니다.[batch, W, H, n_classes]
의 예제 입력 형상을 갖는 범주형 손실의 경우,n_classes
차원이 축소됩니다.losses.mean_squared_error
와 같은 포인트별 손실의 경우, 더미 축을 포함시켜[batch, W, H, 1]
이[batch, W, H]
로 축소되도록 합니다. 더미 축이 없으면[batch, W, H]
가[batch, W]
로 잘못 축소됩니다.
with strategy.scope():
# Set reduction to `NONE` so you can do the reduction afterwards and divide by
# global batch size.
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
def compute_loss(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=GLOBAL_BATCH_SIZE)
손실과 정확도를 기록하기 위한 지표 정의하기
이 지표(metrics)는 테스트 손실과 훈련 정확도, 테스트 정확도를 기록합니다. .result()
를 사용해서 누적된 통계값들을 언제나 볼 수 있습니다.
with strategy.scope():
test_loss = tf.keras.metrics.Mean(name='test_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
훈련 루프
# A model, an optimizer, and a checkpoint must be created under `strategy.scope`.
with strategy.scope():
model = create_model()
optimizer = tf.keras.optimizers.Adam()
checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
def train_step(inputs):
images, labels = inputs
with tf.GradientTape() as tape:
predictions = model(images, training=True)
loss = compute_loss(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_accuracy.update_state(labels, predictions)
return loss
def test_step(inputs):
images, labels = inputs
predictions = model(images, training=False)
t_loss = loss_object(labels, predictions)
test_accuracy.update_state(labels, predictions)
# `run` replicates the provided computation and runs it
# with the distributed input.
def distributed_train_step(dataset_inputs):
per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
def distributed_test_step(dataset_inputs):
return strategy.run(test_step, args=(dataset_inputs,))
for epoch in range(EPOCHS):
total_loss = 0.0
num_batches = 0
for x in train_dist_dataset:
total_loss += distributed_train_step(x)
num_batches += 1
train_loss = total_loss / num_batches
for x in test_dist_dataset:
if epoch % 2 == 0:
template = ("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, "
"Test Accuracy: {}")
print(template.format(epoch + 1, train_loss,
train_accuracy.result() * 100, test_loss.result(),
test_accuracy.result() * 100))
위의 예제에서 주목해야 하는 부분
for x in ...
구문을 사용하여train_dist_dataset
를 반복합니다.- 스케일이 조정된 손실은
의 반환값입니다.tf.distribute.Strategy.reduce
호출을 사용해서 장치들 간의 스케일이 조정된 손실 값을 전부 합칩니다. 그리고 나서tf.distribute.Strategy.reduce
반환 값을 더하는 식으로 배치 간의 손실을 모읍니다. tf.keras.Metrics
에 의해 실행되는train_step
내부에서 업데이트되어야 합니다.tf.distribute.Strategy.run
은 전략의 각 로컬 복제본에서 결과를 반환하며 이 결과를 사용하는 방법에는 여러 가지가 있습니다.tf.distribute.Strategy.reduce
를 수행하여 집계된 값을 얻을 수 있습니다.tf.distribute.Strategy.experimental_local_results
를 수행하여 로컬 복제본당 하나씩 결과에 포함된 값 목록을 가져올 수도 있습니다.
최신 체크포인트를 불러와서 테스트하기
를 사용해서 체크포인트가 만들어진 모델은 전략 사용 여부에 상관없이 불러올 수 있습니다.
eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
new_model = create_model()
new_optimizer = tf.keras.optimizers.Adam()
test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(GLOBAL_BATCH_SIZE)
def eval_step(images, labels):
predictions = new_model(images, training=False)
eval_accuracy(labels, predictions)
checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
for images, labels in test_dataset:
eval_step(images, labels)
print('Accuracy after restoring the saved model without strategy: {}'.format(
eval_accuracy.result() * 100))
Accuracy after restoring the saved model without strategy: 88.55999755859375
데이터셋에 대해 반복작업을 하는 다른 방법들
반복자(iterator)를 사용하기
만약 주어진 스텝의 수에 따라서 반복하기 원하면서 전체 데이터세트를 보는 것을 원치 않는다면, iter
를 호출하여 반복자를 만들 수 있습니다. 그 다음 명시적으로 next
를 호출합니다. 또한, tf.function 내부 또는 외부에서 데이터세트를 반복하도록 설정할 수 있습니다. 다음은 반복자를 사용하여 tf.function 외부에서 데이터세트를 반복하는 코드 예제입니다.
for _ in range(EPOCHS):
total_loss = 0.0
num_batches = 0
train_iter = iter(train_dist_dataset)
for _ in range(10):
total_loss += distributed_train_step(next(train_iter))
num_batches += 1
average_train_loss = total_loss / num_batches
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print(template.format(epoch + 1, average_train_loss, train_accuracy.result() * 100))
Epoch 10, Loss: 0.24009506404399872, Accuracy: 90.78125 Epoch 10, Loss: 0.22674480080604553, Accuracy: 91.796875 Epoch 10, Loss: 0.22516441345214844, Accuracy: 91.7578125 Epoch 10, Loss: 0.24468576908111572, Accuracy: 90.546875 Epoch 10, Loss: 0.22958669066429138, Accuracy: 91.8359375 Epoch 10, Loss: 0.2196839302778244, Accuracy: 92.1875 Epoch 10, Loss: 0.2384246289730072, Accuracy: 90.9765625 Epoch 10, Loss: 0.21778538823127747, Accuracy: 91.953125 Epoch 10, Loss: 0.20954498648643494, Accuracy: 91.9921875 Epoch 10, Loss: 0.18693020939826965, Accuracy: 93.0859375
tf.function 내부에서 반복하기
for x in ...
구조를 사용하거나 위에서 했던 것처럼 반복자를 생성하여 tf.function
내부의 전체 입력 train_dist_dataset
에 대해 반복할 수도 있습니다. 아래 예제는 @tf.function
데코레이터로 훈련한 하나의 epoch를 래핑하고 함수 내에서 train_dist_dataset
을 반복하는 것을 보여줍니다.
def distributed_train_epoch(dataset):
total_loss = 0.0
num_batches = 0
for x in dataset:
per_replica_losses = strategy.run(train_step, args=(x,))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
num_batches += 1
return total_loss / tf.cast(num_batches, dtype=tf.float32)
for epoch in range(EPOCHS):
train_loss = distributed_train_epoch(train_dist_dataset)
template = ("Epoch {}, Loss: {}, Accuracy: {}")
print(template.format(epoch + 1, train_loss, train_accuracy.result() * 100))
장치 간의 훈련 손실 기록하기
노트: 일반적인 규칙으로, tf.keras.Metrics
를 사용하여 샘플당 손실 값을 기록하고 장치 내부에서 값이 합쳐지는 것을 피해야 합니다.
손실 크기 조정 계산이 수행되기 때문에 tf.keras.metrics.Mean
을 사용하여 여러 복제본에서 훈련 손실을 추적하는 것은 권장되지 않습니다.
예를 들어, 다음과 같은 조건의 훈련을 수행한다고 합시다.
- 두개의 장치
- 두개의 샘플들이 각 장치에 의해 처리됩니다.
- 손실 값을 산출합니다: 각각의 장치에 대해 [2, 3]과 [4, 5]
- Global batch size = 4
손실의 스케일 조정을 하면, 손실 값을 더하고 전역 배치 크기로 나누어 각 장치에 대한 샘플당 손실값을 계산할 수 있습니다. 이 경우에는 (2 + 3) / 4 = 1.25
및 (4 + 5) / 4 = 2.25
만약 tf.keras.metrics.Mean
을 사용하여 두 복제본의 손실을 추적한다면 결과가 다릅니다. 이 예에서는 결과적으로 메트릭에서 result()
가 호출될 때 total
3.50개와 count
2개가 되고, total
= 1.75가 됩니다. tf.keras.Metrics
로 계산된 손실은 동기화된 복제본 수에 해당하는 추가 인자로 조정됩니다.
