TensorFlow.org で表示 | Google Colab で実行 | GitHubでソースを表示 | ノートブックをダウンロード |
概要
このチュートリアルでは、Keras で DTensor を使用する方法について学習します。
DTensor を Keras と組み合わせることで、分散型機械学習モデルの構築とトレーニングに既存の Keras レイヤーとモデルを再利用することができます。
MNIST データを使用してマルチレイヤーの分類モデルをトレーニングします。サブクラス化モデル、Sequential モデル、Functional モデルのレイアウトの設定について説明します。
このチュートリアルでは、すでに「DTensor プログラミングガイド」を読んでいること、Mesh
や Layout
などの基本的な DTensor の概念に精通していることを前提としています。
このチュートリアルでは、https://www.tensorflow.org/datasets/keras_example を基盤に使用しています。
MNIST モデルをビルドする
DTensor は、TensorFlow 2.9.0 リリースに含まれています。
pip install --quiet --upgrade --pre tensorflow tensorflow-datasets
次に、tensorflow
と tensorflow.experimental.dtensor
をインポートし、8 個の仮想 CPU を使用するように TensorFlow を構成します。
この例では CPU を使用しますが、DTensor は CPU、GPU、または TPU デバイスで同じように動作します。
import tensorflow as tf
import tensorflow_datasets as tfds
from tensorflow.experimental import dtensor
2024-01-11 18:16:50.357646: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 2024-01-11 18:16:50.357693: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 2024-01-11 18:16:50.359187: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
def configure_virtual_cpus(ncpu):
phy_devices = tf.config.list_physical_devices('CPU')
tf.config.set_logical_device_configuration(
phy_devices[0],
[tf.config.LogicalDeviceConfiguration()] * ncpu)
configure_virtual_cpus(8)
tf.config.list_logical_devices('CPU')
devices = [f'CPU:{i}' for i in range(8)]
決定論的疑似乱数ジェネレータ
1 つ注意しておかなければならないのは、DTensor API では、実行中の各クライアントに同じランダムシードがある必要があることです。そうすることで、重みの初期化で決定論的動作が得られます。これは、Keras で tf.keras.utils.set_random_seed()
を使ってグローバルシードを設定することで行えます。
tf.keras.backend.experimental.enable_tf_random_generator()
tf.keras.utils.set_random_seed(1337)
データ並列メッシュを作成する
このチュートリアルでは、データ並列トレーニングを実演します。モデルの並列トレーニングと空間の並列トレーニングへの適応は、別のセットの Layout
オブジェクトに切り替えるのと同じくらい単純です。データ並列を超える分散トレーニングについての詳細は、DTensor の詳細な ML チュートリアルをご覧ください。
データ並列トレーニングは、一般的に使用される並列トレーニングですが、tf.distribute.MirroredStrategy
などによっても使用されます。
DTensor を使うと、データ並列トレーニングループは、単一の 'batch' 次元で構成される Mesh
を使用します。各デバイスは、グローバルの batch からシャードを受け取るモデルのレプリカを実行します。
mesh = dtensor.create_mesh([("batch", 8)], devices=devices)
各デバイスがモデルの完全なレプリカを実行する過程で、モデルの変数がメッシュ(シャーディングなし)間で完全に複製されます。例として、この Mesh
の階数 2 の重みに対して完全に複製されるレイアウトは、以下のようになります。
example_weight_layout = dtensor.Layout([dtensor.UNSHARDED, dtensor.UNSHARDED], mesh) # or
example_weight_layout = dtensor.Layout.replicated(mesh, rank=2)
階数 2 のデータテンソルのレイアウトは、最初の次元(batch_sharded
としても知られます)に沿ってシャーディングされます。
example_data_layout = dtensor.Layout(['batch', dtensor.UNSHARDED], mesh) # or
example_data_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
レイアウトで Keras レイヤーを作成する
データ並列スキームでは通常、モデルの各レプリカがシャーディングされた入力データを使って計算を行えるように、完全に複製されたレイアウトを使ってモデルの重みを作成します。
レイヤーの重みのレイアウト情報を構成するために、Keras では、ほとんどの組み込みレイヤーで使用できる追加のパラメータをレイヤーコンストラクタに公開しています。
以下は、完全に複製された重みレイアウトを使用する小さな画像分類モデルを構築する例です。レイアウト情報の kernel
と bias
は、kernel_layout
と bias_layout
の引数を介して tf.keras.layers.Dense
に指定できます。組み込み Keras レイヤーのほとんどは、レイアウトの重みの Layout
を明示的に指定できるようになっています。
unsharded_layout_2d = dtensor.Layout.replicated(mesh, 2)
unsharded_layout_1d = dtensor.Layout.replicated(mesh, 1)
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(128,
activation='relu',
name='d1',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d),
tf.keras.layers.Dense(10,
name='d2',
kernel_layout=unsharded_layout_2d,
bias_layout=unsharded_layout_1d)
])
レイアウト情報は、重みの layout
プロパティを調べることで確認できます。
for weight in model.weights:
print(f'Weight name: {weight.name} with layout: {weight.layout}')
break
Weight name: d1/kernel:0 with layout: Layout.from_string(sharding_specs:unsharded,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
データセットを読み込んで入力パイプラインを構築する
MNIST データセットを読み込んで、それに使用する事前処理用の入力パイプラインを構成します。データセット自体は DTensor レイアウト情報に関連付けられていません。今後の TensorFlow リリースにおいて、tf.data
との DTensor Keras 統合が改善される予定です。
(ds_train, ds_test), ds_info = tfds.load(
'mnist',
split=['train', 'test'],
shuffle_files=True,
as_supervised=True,
with_info=True,
)
def normalize_img(image, label):
"""Normalizes images: `uint8` -> `float32`."""
return tf.cast(image, tf.float32) / 255., label
batch_size = 128
ds_train = ds_train.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(batch_size)
ds_train = ds_train.prefetch(tf.data.AUTOTUNE)
ds_test = ds_test.map(
normalize_img, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(batch_size)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.AUTOTUNE)
モデルのトレーニングロジックを定義する
次に、モデルのトレーニングロジックと評価ロジックを定義します。
TensorFlow 2.9 の時点では、DTensor が有効化された Keras モデルにカスタムトレーニングループを書き込む必要があります。これは、入力データに、Keras の標準の tf.keras.Model.fit()
または tf.keras.Model.eval()
関数には組み込まれていない適切なレイアウト情報を詰め込むために行います。さらに多くの tf.data
サポートが、今後のリリースで追加される予定です。
@tf.function
def train_step(model, x, y, optimizer, metrics):
with tf.GradientTape() as tape:
logits = model(x, training=True)
# tf.reduce_sum sums the batch sharded per-example loss to a replicated
# global loss (scalar).
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'loss': loss_per_sample}
return results
@tf.function
def eval_step(model, x, y, metrics):
logits = model(x, training=False)
loss = tf.reduce_sum(tf.keras.losses.sparse_categorical_crossentropy(
y, logits, from_logits=True))
for metric in metrics.values():
metric.update_state(y_true=y, y_pred=logits)
loss_per_sample = loss / len(x)
results = {'eval_loss': loss_per_sample}
return results
def pack_dtensor_inputs(images, labels, image_layout, label_layout):
num_local_devices = image_layout.mesh.num_local_devices()
images = tf.split(images, num_local_devices)
labels = tf.split(labels, num_local_devices)
images = dtensor.pack(images, image_layout)
labels = dtensor.pack(labels, label_layout)
return images, labels
Metric と Optimizer
Keras Metric
と Optimizer
を使って DTensor API を使用する場合、追加のメッシュ情報を指定して、内部状態変数とテンソルがモデルの変数と連携できるようにする必要があります。
オプティマイザの場合、DTensor は
keras.dtensor.experimental.optimizers
という新しい実験的な名前空間を使用します。多くの既存の Keras Optimizer は、追加のmesh
引数を受け取るように拡張されます。今後のリリースでは、Keras のコアオプティマイザにマージされる可能性があります。指標の場合、DTensor 対応の
Metric
になるように、コンストラクタに直接引数としてmesh
を指定できます。
optimizer = tf.keras.dtensor.experimental.optimizers.Adam(0.01, mesh=mesh)
metrics = {'accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
eval_metrics = {'eval_accuracy': tf.keras.metrics.SparseCategoricalAccuracy(mesh=mesh)}
モデルのトレーニング
以下の例では、batch 次元で入力パイプラインのデータをシャード化し、完全に複製された重みをもつモデルを使ってトレーニングします。
モデルは 3 つのエポックで、約 97% の精度を達成します。
num_epochs = 3
image_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=4)
label_layout = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
for epoch in range(num_epochs):
print("============================")
print("Epoch: ", epoch)
for metric in metrics.values():
metric.reset_state()
step = 0
results = {}
pbar = tf.keras.utils.Progbar(target=None, stateful_metrics=[])
for input in ds_train:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(train_step(model, images, labels, optimizer, metrics))
for metric_name, metric in metrics.items():
results[metric_name] = metric.result()
pbar.update(step, values=results.items(), finalize=False)
step += 1
pbar.update(step, values=results.items(), finalize=True)
for metric in eval_metrics.values():
metric.reset_state()
for input in ds_test:
images, labels = input[0], input[1]
images, labels = pack_dtensor_inputs(
images, labels, image_layout, label_layout)
results.update(eval_step(model, images, labels, eval_metrics))
for metric_name, metric in eval_metrics.items():
results[metric_name] = metric.result()
for metric_name, metric in results.items():
print(f"{metric_name}: {metric.numpy()}")
============================ Epoch: 0 469/Unknown - 7s 15ms/step - loss: 0.2907 - accuracy: 0.8308 469/Unknown - 4s 9ms/step - loss: 0.1285 - accuracy: 0.9595 469/Unknown - 4s 9ms/step - loss: 0.1010 - accuracy: 0.9682 loss: 0.044021397829055786 accuracy: 0.9682833552360535 eval_loss: 0.05413995310664177 eval_accuracy: 0.9656000137329102
既存のモデルコードのレイアウトを指定する
ほとんどの場合、モデルは特定のユースケースでうまく動作するようになっているため、モデル内の個別のレイヤーに Layout
情報を指定する作業は膨大であり、多数の編集作業が必要となります。
既存の Keras モデルを DTensor API で動作できるようにするための変換作業を行いやすくするために、グローバルな観点で Layout
を指定できる新しい dtensor.LayoutMap
API を使用できます。
まず、LayoutMap
インスタンスを作成する必要があります。これは、モデルの重みに指定するすべての Layout
を構成するディクショナリのようなオブジェクトです。
LayoutMap
には、init 時に Mesh
インスタンスが必要です。これは、Layout が構成されていない、任意の重みに対するデフォルトの複製済み Layout
を指定するために使用できます。すべてのモデルの重みを完全に複製するだけの場合は、空の LayoutMap
を指定すると、デフォルトのメッシュを使って複製された Layout
が作成されます。
LayoutMap
は、文字列をキーとして、Layout
を値として使用します。通常の Python dict とこのクラスでは、動作が異なります。文字列キーは、値を取得する際の正規表現として処理されます。
Subclassed モデル
Keras のサブクラス化モデル構文を使って定義された以下のモデルについて考察してみましょう。
class SubclassedModel(tf.keras.Model):
def __init__(self, name=None):
super().__init__(name=name)
self.feature = tf.keras.layers.Dense(16)
self.feature_2 = tf.keras.layers.Dense(24)
self.dropout = tf.keras.layers.Dropout(0.1)
def call(self, inputs, training=None):
x = self.feature(inputs)
x = self.dropout(x, training=training)
return self.feature_2(x)
このモデルには、2 つの Dense
レイヤーに対し kernel
と bias
レイヤーという 4 つの重みがあります。それぞれは、オブジェクトパスに基づいてマッピングされています。
model.feature.kernel
model.feature.bias
model.feature_2.kernel
model.feature_2.bias
注意: Subclassed モデルでは、マッピングから Layout を取得する際に、レイヤーの .name
属性ではなく、属性名がキーとして使用されます。これは、tf.Module
のチェックポイント設定が使う規則と同じです。多数のレイヤーを持つ複雑なモデルでは、チェックポイントを手動で検査することで、属性のマッピングを確認できます。
では、以下の LayoutMap
を定義して、モデルを適用しましょう。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
subclassed_model = SubclassedModel()
モデルの重みは最初のセルに作成されているため、DTensor 入力でモデルを呼び出し、重みに期待されるレイアウトがあることを確認します。
dtensor_input = dtensor.copy_to_mesh(tf.zeros((16, 16)), layout=unsharded_layout_2d)
# Trigger the weights creation for subclass model
subclassed_model(dtensor_input)
print(subclassed_model.feature.kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
これにより、既存のコードを更新することなく、Layout
をすばやくモデルにマッピングすることができます。
Sequential モデルと Functional モデル
Keras Functional モデルと Sequential モデルの場合も、LayoutMap
を使用できます。
注意: Functional モデルと Sequential モデルでは、マッピングにわずかな違いがあります。モデルのレイヤーには、モデルに接続された公開属性がありません(ただし、model.layers
を介してリストとしてアクセス可能です)。この場合、文字列名をキーとして使用します。文字列名は、モデル内で必ず一意の値です。
layout_map = tf.keras.dtensor.experimental.LayoutMap(mesh=mesh)
layout_map['feature.*kernel'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=2)
layout_map['feature.*bias'] = dtensor.Layout.batch_sharded(mesh, 'batch', rank=1)
with layout_map.scope():
inputs = tf.keras.Input((16,), batch_size=16)
x = tf.keras.layers.Dense(16, name='feature')(inputs)
x = tf.keras.layers.Dropout(0.1)(x)
output = tf.keras.layers.Dense(32, name='feature_2')(x)
model = tf.keras.Model(inputs, output)
print(model.layers[1].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)
with layout_map.scope():
model = tf.keras.Sequential([
tf.keras.layers.Dense(16, name='feature', input_shape=(16,)),
tf.keras.layers.Dropout(0.1),
tf.keras.layers.Dense(32, name='feature_2')
])
print(model.layers[2].kernel.layout)
Layout.from_string(sharding_specs:batch,unsharded, mesh:|batch=8|0,1,2,3,4,5,6,7|0,1,2,3,4,5,6,7|/job:localhost/replica:0/task:0/device:CPU:0,/job:localhost/replica:0/task:0/device:CPU:1,/job:localhost/replica:0/task:0/device:CPU:2,/job:localhost/replica:0/task:0/device:CPU:3,/job:localhost/replica:0/task:0/device:CPU:4,/job:localhost/replica:0/task:0/device:CPU:5,/job:localhost/replica:0/task:0/device:CPU:6,/job:localhost/replica:0/task:0/device:CPU:7)