TensorFlow.org에서 보기 | Google Colab에서 실행 | View source on GitHub | 노트북 다운로드 |
시작하기
감독 학습을 수행할 때 fit()
를 사용할 수 있으며 모든 것이 원활하게 작동합니다.
훈련 루프를 처음부터 작성해야 하는 경우, GradientTape
를 사용하여 모든 세부 사항을 제어할 수 있습니다.
그러나 사용자 정의 훈련 알고리즘이 필요하지만 콜백, 내장 배포 지원 또는 단계 융합과 같은 fit()
의 편리한 특성을 계속 활용하려면 어떻게 해야 할까요?
Keras의 핵심 원칙은 복잡성의 점진적인 공개입니다. 항상 점진적으로 저수준 워크플로부터 시작할 수 있어야 합니다. 높은 수준의 기능이 자신의 사용 사례와 정확하게 일치하지 않다고 해서 절망할 필요는 없습니다. 적절한 수준의 고수준 편의를 유지하면서 작은 세부 사항을 보다 효과적으로 제어할 수 있어야 합니다.
fit()
를 사용자 정의해야 하는 경우, Model
클래스의 훈련 단계 함수를 재정의해야 합니다. 이 함수는 모든 데이터 배치에 대해 fit()
에 의해 호출되는 함수입니다. 그런 다음 평소와 같이 fit()
을 호출 할 수 있으며 자체 학습 알고리즘을 실행합니다.
이 패턴은 Functional API를 사용하여 모델을 빌드하는 데 방해가 되지 않습니다. Sequential
모델, Functional API 모델, 또는 하위 클래스화된 모델과 관계없이 수행할 수 있습니다.
어떻게 동작하는지 살펴보겠습니다.
설정
TensorFlow 2.2 이상이 필요합니다.
import tensorflow as tf
from tensorflow import keras
2022-12-14 22:25:44.982786: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 22:25:44.982892: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 22:25:44.982903: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
첫 번째 간단한 예제
간단한 예제부터 시작하겠습니다.
keras.Model
을 하위 클래스화하는 새 클래스를 만듭니다.train_step(self, data)
메서드를 재정의합니다.- 손실을 포함하여 사전 매핑 메트릭 이름을 현재 값으로 반환합니다.
입력 인수 data
는 훈련 데이터에 맞게 전달됩니다.
fit(x, y, ...)
를 호출하여 Numpy 배열을 전달하면data
는 튜플(x, y)
가 됩니다.tf.data.Dataset
를 전달하는 경우,fit(dataset, ...)
를 호출하여data
가 각 배치에서dataset
에 의해 산출됩니다.
train_step
메서드의 본문에서 이미 익숙한 것과 유사한 정기적인 훈련 업데이트를 구현합니다. 중요한 것은 self.compiled_loss
를 통해 손실을 계산하여 compile()
로 전달된 손실 함수를 래핑합니다.
마찬가지로, self.compiled_metrics.update_state(y, y_pred)
를 호출하여 compile()
에 전달된 메트릭의 상태를 업데이트하고, 마지막에 self.metrics
의 결과를 쿼리하여 현재 값을 검색합니다.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
다음을 시도해봅시다.
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.6662 - mae: 0.6898 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.2882 - mae: 0.4303 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.2313 - mae: 0.3891 <keras.callbacks.History at 0x7f54ec034e80>
더 낮은 수준으로 구성하기
당연히 compile()
에서 손실 함수의 전달을 건너뛰고, 대신 train_step
에서 수동으로 모두 수행할 수 있습니다. 메트릭도 마찬가지입니다.
다음은 옵티마이저를 구성하기 위해 compile()
만 사용하는 하위 수준의 예입니다.
- 먼저 손실과 MAE 점수를 추적하기 위해
Metric
인스턴스를 생성합니다. - (메트릭에 대한
update_state()
를 호출하여) 메트릭의 상태를 업데이트하는 사용자 정의train_step()
을 구현한 다음, 쿼리하여(result()
를 통해) 현재 평균 값을 반환하여 진행률 표시줄에 표시되고 모든 콜백에 전달되도록 합니다. - 각 epoch 사이의 메트릭에 대해
reset_states()
를 호출해야 합니다. 그렇지 않으면,result()
를 호출하면 훈련 시작 이후부터 평균이 반환되지만, 일반적으로 epoch당 평균을 사용합니다. 다행히도 프레임워크에서는 다음과 같이 수행할 수 있습니다. 즉, 재설정하려는 매트릭을 모델의metrics
속성에 나열하기만 하면 됩니다. 모델은 각fit()
epoch가 시작될 때 또는evaluate()
호출이 시작될 때 여기에 나열된 모든 객체에 대해reset_states()
를 호출합니다.
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
class CustomModel(keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
loss_tracker.update_state(loss)
mae_metric.update_state(y, y_pred)
return {"loss": loss_tracker.result(), "mae": mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [loss_tracker, mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't passs a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2728 - mae: 0.4312 Epoch 2/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2168 - mae: 0.3769 Epoch 3/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2082 - mae: 0.3701 Epoch 4/5 32/32 [==============================] - 0s 2ms/step - loss: 0.2004 - mae: 0.3634 Epoch 5/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1923 - mae: 0.3558 <keras.callbacks.History at 0x7f54e01be760>
sample_weight
및 class_weight
지원하기
첫 번째 기본 예제에서는 샘플 가중치에 대해 언급하지 않았습니다. fit()
인수 sample_weight
및 class_weight
를 지원하려면 다음을 수행하면 됩니다.
data
인수에서sample_weight
패키지를 풉니다.compiled_loss
및compiled_metrics
에 전달합니다(손실 및 메트릭을 위해compile()
에 의존하지 않는다면 수동으로 적용할 수도 있습니다).- 다음은 그 목록입니다.
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.1389 - mae: 0.4280 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1296 - mae: 0.4126 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1229 - mae: 0.4022 <keras.callbacks.History at 0x7f54e0179c70>
자신만의 평가 단계 제공하기
model.evaluate()
호출에 대해 같은 작업을 수행하려면 어떻게 해야 할까요? 정확히 같은 방식으로 test_step
을 재정의합니다. 다음과 같습니다.
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 2ms/step - loss: 1.1476 - mae: 0.9479 [1.147562026977539, 0.9478825926780701]
마무리: 엔드-투-엔드 GAN 예제
방금 배운 모든 내용을 활용하는 엔드 투 엔드 예제를 살펴보겠습니다.
다음을 고려합니다.
- 생성기 네트워크는 28x28x1 이미지를 생성합니다.
- discriminator 네트워크는 28x28x1 이미지를 두 개의 클래스("false" 및 "real")로 분류하기 위한 것입니다.
- 각각 하나의 옵티마이저를 가집니다.
- discriminator를 훈련하는 손실 함수입니다.
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
다음은 자신만의 서명을 사용하기 위해 compile()
을 재정의하고 train_step
17줄로 전체 GAN 알고리즘을 구현하는 특성 완료형 GAN 클래스입니다.
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return {"d_loss": d_loss, "g_loss": g_loss}
테스트해 봅시다.
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 5s 15ms/step - d_loss: 0.4243 - g_loss: 0.8427 <keras.callbacks.History at 0x7f54cc0da700>
딥 러닝의 기본 개념은 간단합니다. 구현이 고통스러울 이유가 없습니다.