TensorFlow.org で表示 | Google Colab で実行 | GitHub でソースを表示 | ノートブックをダウンロード |
はじめに
教師あり学習を実行するときに fit()
を使用するとスムーズに学習を進めることができます。
独自のトレーニングループを新規で書く必要がある場合には、GradientTape
を使用すると、細かく制御することができます。
しかし、カスタムトレーニングアルゴリズムが必要ながらも、コールバック、組み込みの分散サポート、ステップ結合など、fit()
の便利な機能を利用したい場合には、どうすればよいのでしょうか?
Keras の基本原則は、複雑性のプログレッシブディスクロージャ―です。常に段階的に低レベルのワークフローに入ることが可能で、高レベルの機能性がユースケースと完全に一致しない場合でも、急激に性能が落ちるようなことはありません。相応の高レベルの利便性を維持しながら細部をさらに制御することができます。
fit()
の動作をカスタマイズする必要がある場合は、Model
クラスのトレーニングステップ関数をオーバーライドする必要があります。これはデータのバッチごとに fit()
に呼び出される関数です。これによって、通常通り fit()
を呼び出せるようになり、独自の学習アルゴリズムが実行されるようになります。
このパターンは Functional API を使用したモデル構築を妨げるものではないことに注意してください。これは、Sequential
モデル、Functional API モデル、サブクラス化されたモデルのいずれを構築する場合にも適用可能です。
では、その仕組みを見ていきましょう。
セットアップ
TensorFlow 2.2 以降が必要です。
import tensorflow as tf
from tensorflow import keras
2022-12-14 21:05:04.471118: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory 2022-12-14 21:05:04.471213: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory 2022-12-14 21:05:04.471222: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
最初の簡単な例
簡単な例から始めてみましょう。
keras.Model
をサブクラス化する新しいクラスを作成します。train_step(self, data)
メソッドだけをオーバーライドします。- メトリクス名(損失を含む)をマッピングするディクショナリを現在の値に返します。
入力引数の data
は、トレーニングデータとして適合するために渡される値です。
fit(x, y, ...)
を呼び出して Numpy 配列を渡す場合は、data
はタプル型(x, y)
になります。fit(dataset, ...)
を呼び出してtf.data.Dataset
を渡す場合は、data
は各バッチでdataset
により生成される値になります。
train_step
メソッドの本体には、既に使い慣れているものと同様の定期的なトレーニングアップデートを実装しています。重要なのは、損失の計算を self.compiled_loss
を介して行っていることで、それによって compile()
に渡された損失関数がラップされています。
同様に、self.compiled_metrics.update_state(y, y_pred)
を呼び出して compile()
に渡されたメトリクスの状態を更新し、最後に self.metrics
の結果をクエリして現在の値を取得しています。
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value
# (the loss function is configured in `compile()`)
loss = self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update metrics (includes the metric that tracks the loss)
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value
return {m.name: m.result() for m in self.metrics}
これを試してみましょう。
import numpy as np
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# Just use `fit` as usual
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.2539 - mae: 0.4101 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1882 - mae: 0.3503 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1825 - mae: 0.3448 <keras.callbacks.History at 0x7f056c01ee50>
低レベルにする
当然ながら、compile()
に損失関数を渡すことを省略し、代わりに train_step
ですべてを手動で実行することは可能です。これはメトリクスの場合でも同様です。
オプティマイザの構成に compile()
のみを使用した、低レベルの例を次に示します。
- まず、損失と MAE スコアを追跡する
Metric
インスタンスを作成します。 - これらのメトリクスの状態を更新するカスタム
train_step()
を実装し(メトリクスでupdate_state()
を呼び出します)、現在の平均値を返して進捗バーで表示し、任意のコールバックに渡せるようにメトリクスをクエリします(result() を使用)。 - エポックごとにメトリクスに
reset_states()
を呼び出す必要があるところに注意してください。呼び出さない場合、result()
は通常処理しているエポックごとの平均ではなく、トレーニングを開始してからの平均を返してしまいます。幸いにも、これはフレームワークが行ってくれるため、モデルのmetrics
プロパティにリセットするメトリクスをリストするだけで実現できます。モデルは、そこにリストされているオブジェクトに対するreset_states()
の呼び出しを各fit()
エポックの開始時またはevaluate()
への呼び出しの開始時に行うようになります。
loss_tracker = keras.metrics.Mean(name="loss")
mae_metric = keras.metrics.MeanAbsoluteError(name="mae")
class CustomModel(keras.Model):
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute our own loss
loss = keras.losses.mean_squared_error(y, y_pred)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Compute our own metrics
loss_tracker.update_state(loss)
mae_metric.update_state(y, y_pred)
return {"loss": loss_tracker.result(), "mae": mae_metric.result()}
@property
def metrics(self):
# We list our `Metric` objects here so that `reset_states()` can be
# called automatically at the start of each epoch
# or at the start of `evaluate()`.
# If you don't implement this property, you have to call
# `reset_states()` yourself at the time of your choosing.
return [loss_tracker, mae_metric]
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
# We don't passs a loss or metrics here.
model.compile(optimizer="adam")
# Just use `fit` as usual -- you can use callbacks, etc.
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.fit(x, y, epochs=5)
Epoch 1/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1901 - mae: 0.3546 Epoch 2/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1799 - mae: 0.3451 Epoch 3/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1705 - mae: 0.3363 Epoch 4/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1620 - mae: 0.3284 Epoch 5/5 32/32 [==============================] - 0s 2ms/step - loss: 0.1545 - mae: 0.3212 <keras.callbacks.History at 0x7f056024c730>
sample_weight
と class_weight
をサポートする
最初の基本的な例では、サンプルの重み付けについては何も言及していないことに気付いているかもしれません。fit()
の引数 sample_weight
と class_weight
をサポートする場合には、次のようにします。
data
引数からsample_weight
をアンパックします。- それを
compiled_loss
とcompiled_metrics
に渡します(もちろん、 損失とメトリクスがcompile()
に依存しない場合は手動での適用が可能です)。 - それがリストです。
class CustomModel(keras.Model):
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct and compile an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(optimizer="adam", loss="mse", metrics=["mae"])
# You can now use sample_weight argument
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
sw = np.random.random((1000, 1))
model.fit(x, y, sample_weight=sw, epochs=3)
Epoch 1/3 32/32 [==============================] - 1s 2ms/step - loss: 0.4508 - mae: 0.8067 Epoch 2/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1881 - mae: 0.4751 Epoch 3/3 32/32 [==============================] - 0s 2ms/step - loss: 0.1230 - mae: 0.3819 <keras.callbacks.History at 0x7f0560205fd0>
独自の評価ステップを提供する
model.evaluate()
への呼び出しに同じことをする場合はどうしたらよいでしょう?その場合は、まったく同じ方法で test_step
をオーバーライドします。これは次のようになります。
class CustomModel(keras.Model):
def test_step(self, data):
# Unpack the data
x, y = data
# Compute predictions
y_pred = self(x, training=False)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
# Construct an instance of CustomModel
inputs = keras.Input(shape=(32,))
outputs = keras.layers.Dense(1)(inputs)
model = CustomModel(inputs, outputs)
model.compile(loss="mse", metrics=["mae"])
# Evaluate with our custom test_step
x = np.random.random((1000, 32))
y = np.random.random((1000, 1))
model.evaluate(x, y)
32/32 [==============================] - 0s 2ms/step - loss: 0.2722 - mae: 0.4190 [0.27221357822418213, 0.4190024435520172]
まとめ: エンドツーエンド GAN の例
ここで学んだことをすべて採り入れたエンドツーエンドの例を見てみましょう。
以下を検討してみましょう。
- 28x28x1 の画像を生成するジェネレーターネットワーク。
- 28x28x1 の画像を 2 つのクラス(「偽物」と「本物」)に分類するディスクリミネーターネットワーク。
- それぞれに 1 つのオプティマイザ。
- ディスクリミネーターをトレーニングする損失関数。
from tensorflow.keras import layers
# Create the discriminator
discriminator = keras.Sequential(
[
keras.Input(shape=(28, 28, 1)),
layers.Conv2D(64, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(128, (3, 3), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.GlobalMaxPooling2D(),
layers.Dense(1),
],
name="discriminator",
)
# Create the generator
latent_dim = 128
generator = keras.Sequential(
[
keras.Input(shape=(latent_dim,)),
# We want to generate 128 coefficients to reshape into a 7x7x128 map
layers.Dense(7 * 7 * 128),
layers.LeakyReLU(alpha=0.2),
layers.Reshape((7, 7, 128)),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), padding="same"),
layers.LeakyReLU(alpha=0.2),
layers.Conv2D(1, (7, 7), padding="same", activation="sigmoid"),
],
name="generator",
)
ここにフィーチャーコンプリートの GAN クラスがあります。compile()
をオーバーライドして独自のシグネチャを使用することにより、GAN アルゴリズム全体をtrain_step
の 17 行で実装しています。
class GAN(keras.Model):
def __init__(self, discriminator, generator, latent_dim):
super(GAN, self).__init__()
self.discriminator = discriminator
self.generator = generator
self.latent_dim = latent_dim
def compile(self, d_optimizer, g_optimizer, loss_fn):
super(GAN, self).compile()
self.d_optimizer = d_optimizer
self.g_optimizer = g_optimizer
self.loss_fn = loss_fn
def train_step(self, real_images):
if isinstance(real_images, tuple):
real_images = real_images[0]
# Sample random points in the latent space
batch_size = tf.shape(real_images)[0]
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Decode them to fake images
generated_images = self.generator(random_latent_vectors)
# Combine them with real images
combined_images = tf.concat([generated_images, real_images], axis=0)
# Assemble labels discriminating real from fake images
labels = tf.concat(
[tf.ones((batch_size, 1)), tf.zeros((batch_size, 1))], axis=0
)
# Add random noise to the labels - important trick!
labels += 0.05 * tf.random.uniform(tf.shape(labels))
# Train the discriminator
with tf.GradientTape() as tape:
predictions = self.discriminator(combined_images)
d_loss = self.loss_fn(labels, predictions)
grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
self.d_optimizer.apply_gradients(
zip(grads, self.discriminator.trainable_weights)
)
# Sample random points in the latent space
random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))
# Assemble labels that say "all real images"
misleading_labels = tf.zeros((batch_size, 1))
# Train the generator (note that we should *not* update the weights
# of the discriminator)!
with tf.GradientTape() as tape:
predictions = self.discriminator(self.generator(random_latent_vectors))
g_loss = self.loss_fn(misleading_labels, predictions)
grads = tape.gradient(g_loss, self.generator.trainable_weights)
self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))
return {"d_loss": d_loss, "g_loss": g_loss}
試運転してみましょう。
# Prepare the dataset. We use both the training & test MNIST digits.
batch_size = 64
(x_train, _), (x_test, _) = keras.datasets.mnist.load_data()
all_digits = np.concatenate([x_train, x_test])
all_digits = all_digits.astype("float32") / 255.0
all_digits = np.reshape(all_digits, (-1, 28, 28, 1))
dataset = tf.data.Dataset.from_tensor_slices(all_digits)
dataset = dataset.shuffle(buffer_size=1024).batch(batch_size)
gan = GAN(discriminator=discriminator, generator=generator, latent_dim=latent_dim)
gan.compile(
d_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
g_optimizer=keras.optimizers.Adam(learning_rate=0.0003),
loss_fn=keras.losses.BinaryCrossentropy(from_logits=True),
)
# To limit the execution time, we only train on 100 batches. You can train on
# the entire dataset. You will need about 20 epochs to get nice results.
gan.fit(dataset.take(100), epochs=1)
100/100 [==============================] - 6s 15ms/step - d_loss: 0.4212 - g_loss: 0.8618 <keras.callbacks.History at 0x7f056007f9a0>
ディープラーニングの背後にある考え方は単純なわけですから、当然、実装も単純なのです。