TensorFlow.org で表示 | Google Colabで実行 | GitHubでソースを表示 | ノートブックをダウンロード |
概要
このチュートリアルでは、tf.distribute.Strategy
API を使用して、Keras モデルと Model.fit
API によるマルチワーカー分散型トレーニングを実演します。このストラテジーにより、単一のワーカーで実行するように設計された Keras モデルは、最小限のコード変更で複数のワーカーでシームレスに機能することができます。
To learn how to use the MultiWorkerMirroredStrategy
with Keras and a custom training loop, refer to Custom training loop with Keras and MultiWorkerMirroredStrategy.
このチュートリアルには、デモ用に 2 つのワーカーを含む最小限のマルチワーカーの例が含まれています。
適切なストラテジーを選択する
始める前に、アクセラレータとトレーニングに tf.distribute.MultiWorkerMirroredStrategy
が適切な選択であることを確認してください。これらは、データ並列処理を使用してトレーニングを分散する 2 つの一般的な方法です。
- 同期トレーニング。
tf.distribute.MirroredStrategy
、tf.distribute.TPUStrategy
およびtf.distribute.MultiWorkerMirroredStrategy
などのトレーニングステップがワーカーとレプリカ間で同期されます。すべてのワーカーは、入力データの異なるスライスを同期してトレーニングし、各ステップで勾配を集約します。 - 非同期トレーニング。
tf.distribute.experimental.ParameterServerStrategy
など、トレーニングステップが厳密に同期されていません。すべてのワーカーは、入力データを個別にトレーニングし、変数を非同期的に更新します。
TPU を使用しないマルチワーカーの同期トレーニングには、tf.distribute.experimental.MultiWorkerMirroredStrategy
を使用します。これは、すべてのワーカーの各デバイスにあるモデルのレイヤーにすべての変数のコピーを作成します。集合通信に使用する TensorFlow 演算子 CollectiveOps
を使用して勾配を集め、変数の同期を維持します。集合実装オプションについては、tf.distribute.experimental.CommunicationOptions
パラメータを確認してください。
tf.distribute.Strategy
API の概要については、TensorFlow での分散トレーニングを参照してください。
セットアップ
まず、必要なものをインポートします。
import json
import os
import sys
TensorFlow をインポートする前に、環境にいくつかの変更を加えます。
- 実際のアプリケーションでは、各ワーカーは異なるマシン上にあります。このチュートリアルでは、すべてのワーカーがこのマシンで実行されます。そのため、すべての GPU を無効にして、すべてのワーカーが同じ GPU を使用しようとすることによって発生するエラーを防ぎます。
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
TF_CONFIG
環境変数をリセットします(これについては後で詳しく説明します)。
os.environ.pop('TF_CONFIG', None)
- 現在のディレクトリが Python のパス上にあることを確認してください。これにより、ノートブックは
%%writefile
で書き込まれたファイルを後でインポートできるようになります。
if '.' not in sys.path:
sys.path.insert(0, '.')
tf-nightly
をインストールします。TensorFlow 2.10 から tf.keras.callbacks.BackupAndRestore
の save_freq
引数を使用した特定のステップでのチェックポイント保存頻度が導入されます。
pip install tf-nightly
最後に、TensorFlow をインポートします。
import tensorflow as tf
2022-12-14 20:00:31.684068: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
データセットとモデルの定義
次に、単純なモデルとデータセットの設定を使用して mnist.py
ファイルを作成します。この Python ファイルは、このチュートリアルのワーカープロセスによって使用されます。
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
シングルワーカーでのモデルのトレーニング
まず、少数のエポックでモデルをトレーニングし、シングルワーカーで結果を観察して、すべてが正しく機能していることを確認します。エポックが進むにつれ、損失が下降し、精度が 1.0 に近づくはずです。
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11490434/11490434 [==============================] - 0s 0us/step 2022-12-14 20:00:34.473671: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 7ms/step - loss: 2.2720 - accuracy: 0.2181 Epoch 2/3 70/70 [==============================] - 0s 6ms/step - loss: 2.1978 - accuracy: 0.4471 Epoch 3/3 70/70 [==============================] - 0s 6ms/step - loss: 2.1061 - accuracy: 0.5902 <keras.callbacks.History at 0x7fc1c84ef6a0>
マルチワーカー構成
では、マルチワーカートレーニングの世界を覗いてみましょう。
ジョブとタスクのクラスタ
TensorFlow では、分散トレーニングには、いくつかのジョブが含まれる 'cluster'
があり、各ジョブには 1 つ以上の 'task'
が含まれることがあります。
それぞれに異なる役割をもつ複数のマシンでトレーニングするには TF_CONFIG
環境変数が必要です。TF_CONFIG
は JSON 文字列で、クラスタの一部である各ワーカーのクラスタ構成を指定するために使用されます。
TF_CONFIG
変数には、'cluster'
と 'task'
の 2 つのコンポーネントがあります。
'cluster'
はすべてのワーカーに共通し、トレーニングクラスタに関する情報を、'worker'
または'chief'
などのさまざまなジョブの種類で構成される dict として提供します。tf.distribute.MultiWorkerMirroredStrategy
によるマルチワーカートレーニングでは通常、'worker'
が通常行うことのほかにチェックポイントの保存や TensorBoard 用のサマリーファイルの書き込みといった役割を果たす 1 つの'worker'
があります。こういった'worker'
はチーフワーカー (ジョブ名は'chief'
) と呼ばれます。- 通常、
'index'
0
を持つワーカーが'chief'
になります。
'task'
は現在のタスクの情報を提供し、ワーカーごとに異なります。タスクはそのワーカーの'type'
と'index'
を指定します。
以下に構成例を示します。
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
tf_config
は Python の単なるローカル変数であることに注意してください。トレーニング構成に使用するには、JSON としてシリアル化し、'TF_CONFIG'
環境変数に配置します。
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
上記の構成例では、タスク 'type'
を 'worker'
に設定し、タスク 'index'
を 0
に設定しています。そのため、このマシンが最初のワーカーとなり、'chief'
ワーカーとして指定されます。
注意: 他のマシンにも TF_CONFIG
環境変数を設定し、同じ 'cluster'
dict が必要となりますが、それらのマシンの役割に応じた異なるタスク 'type'
またはタスク 'index'
が必要となります。
実際には、外部 IP アドレス/ポートに複数のワーカーを作成し、それに応じて各ワーカーに TF_CONFIG
変数を設定します。このチュートリアルでは、デモとして localhost
で 2 つのワーカーを使用して TF_CONFIG
変数を設定する方法を示します。
- 最初の (
'chief'
) ワーカーのTF_CONFIG
は上記に示す通りです。 - 2 つ目のワーカーでは、
tf_config['task']['index']=1
を設定します。
ノートブックの環境変数とサブプロセス
サブプロセスは、親から環境変数を継承します。したがって、この Jupyter Notebook プロセスで環境変数を設定すると、次のようになります。
os.environ['GREETINGS'] = 'Hello TensorFlow!'
すると、サブプロセスからその環境変数にアクセスできます。
echo ${GREETINGS}
Hello TensorFlow!
次のセクションでは、これを使用して TF_CONFIG
をワーカーサブプロセスに渡します。この方法で実際にジョブを起動することは決してありませんが、このチュートリアルで最小限のマルチワーカーの例を示すためには十分です。
モデルのトレーニング
モデルをトレーニングするには、まず tf.distribute.MultiWorkerMirroredStrategy
のインスタンスを作成します。
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
注意: MultiWorkerMirroredStrategy
が呼び出されると、TF_CONFIG
が解析され、TensorFlow の GRPC サーバーが開始します。そのため、TF_CONFIG
環境変数は、tf.distribute.Strategy
インスタンスが作成される前に設定しておく必要があります。TF_CONFIG
はまだ設定されていないため、上記のストラテジーは実質的にシングルワーカーのトレーニングです。
tf.keras
にtf.distribute.Strategy
API を統合したため、トレーニングをマルチワーカーに分散するには、モデルビルディングと model.compile()
呼び出しを strategy.scope()
内に収めるように変更することだけが必要となりました。この分散ストラテジーのスコープは、どこでどのように変数が作成されるかを指定し、MultiWorkerMirroredStrategy
の場合、作成される変数は MirroredVariable
で、各ワーカーに複製されます。
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
注意: 現在のところ、MultiWorkerMirroredStrategy
には、TensorFlow 演算子をストラテジーのインスタンスが作成された後に作成する必要があるという制限があります。RuntimeError: Collective ops must be configured at program startup
が表示される場合は、プログラムのはじめに MultiWorkerMirroredStrategy
のインスタンスを作成するようにし、演算子を作成するコードをストラテジーがインスタンス化される後に配置するようにしてください。
MultiWorkerMirroredStrategy
で実際に実行するには、ワーカープロセスを実行し、TF_CONFIG
をそれらに渡す必要があります。
前に記述したmnist_setup.py
ファイルと同様に、各ワーカーが実行するmain.py
は次のとおりです。
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
上記のコードスニペットでは、Dataset.batch
に渡されるglobal_batch_size
がper_worker_batch_size * num_workers
に設定されていることに注意してください。これにより、ワーカーの数に関係なく、各ワーカーがper_worker_batch_size
の例のバッチを処理するようになります。
現在のディレクトリには、両方の Python ファイルが含まれています。
ls *.py
main.py mnist_setup.py
JSON は TF_CONFIG
をシリアル化し、環境変数に追加します。
os.environ['TF_CONFIG'] = json.dumps(tf_config)
これで、main.py
を実行し、TF_CONFIG
を使用するワーカープロセスを起動できます。
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
上記のコマンドについて注意すべき点がいくつかあります。
- ノートブック 「マジック」 である
%%bash
を使用して、いくつかの bash コマンドを実行します。 - このワーカーは終了しないため、
--bg
フラグを使用してbash
プロセスをバックグラウンドで実行します。 このワーカーは始める前にすべてのワーカーを待ちます。
バックグラウンドのワーカープロセスはこのノートブックに出力を出力しないため、&>
で出力をファイルにリダイレクトし、何が起こったかを検査できます。
プロセスが開始するまで数秒待ちます。
import time
time.sleep(10)
これまでにワーカーのログファイルに出力されたものを検査します。
cat job_0.log
2022-12-14 20:00:37.557684: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay 2022-12-14 20:00:39.352865: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
ログファイルの最後の行は Started server with target: grpc://localhost:12345
であるはずです。最初のワーカーは準備が整い、他のすべてのワーカーの準備が整うのを待っています。
2 番目のワーカーのプロセスを始めるように tf_config
を更新します。
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
2番目のワーカーを起動します。すべてのワーカーがアクティブであるため、これによりトレーニングが開始されます(したがって、このプロセスをバックグラウンドで実行する必要はありません)。
python main.py
2022-12-14 20:00:47.666158: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay 2022-12-14 20:00:49.465809: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-12-14 20:00:50.424484: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-12-14 20:00:50.702584: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 42ms/step - loss: 2.2913 - accuracy: 0.1316 Epoch 2/3 70/70 [==============================] - 3s 41ms/step - loss: 2.2551 - accuracy: 0.2313 Epoch 3/3 70/70 [==============================] - 3s 40ms/step - loss: 2.2144 - accuracy: 0.3311
最初のワーカーにより書き込まれたログを再確認すると、そのモデルのトレーニングに参加していることがわかります。
cat job_0.log
2022-12-14 20:00:37.557684: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay 2022-12-14 20:00:39.352865: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-12-14 20:00:50.422449: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-12-14 20:00:50.700553: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 42ms/step - loss: 2.2913 - accuracy: 0.1316 Epoch 2/3 70/70 [==============================] - 3s 41ms/step - loss: 2.2551 - accuracy: 0.2313 Epoch 3/3 70/70 [==============================] - 3s 40ms/step - loss: 2.2144 - accuracy: 0.3311
注意: 1 台のマシンで複数のワーカーを実行するとオーバーヘッドが増えるため、これはこのチュートリアルの最初に実行されたテストよりも実行に時間がかかる可能性があります。ここでの目標は、トレーニング時間を改善することではなく、マルチワーカートレーニングの例を示すことです。
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
マルチワーカートレーニングの詳細
このチュートリアルでは、基本的なマルチワーカーのセットアップについて説明してきました。このドキュメントの残りの部分では、実際のユースケースに役立つ他の要因について詳しく見ていきます。
データセットのシャーディング
マルチワーカートレーニングでは、コンバージェンスとパフォーマンスを確保するために、データセットのシャーディングが必要です。
前のセクションの例は、tf.distribute.Strategy
API により提供されるデフォルトの自動シャーディングに依存しています。tf.data.experimental.DistributeOptions
の tf.data.experimental.AutoShardPolicy
を設定することで、シャーディングを制御できます。
自動シャーディングの詳細については、分散入力ガイドをご覧ください。
自動シャーディングをオフにして、各レプリカがすべての例を処理する方法の簡単な例を次に示します(推奨されません)。
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
評価する
validation_data
を Model.fit
に渡すと、エポックごとにトレーニングと評価が交互に行われるようになります。評価は同じセットのワーカー間で分散されているため、評価結果はすべてのワーカーが使用できるように集計されます。
トレーニングと同様に、評価データセットもファイルレベルで自動的にシャーディングされます。評価データセットにグローバルバッチサイズを設定し、validation_steps
を設定する必要があります。
繰り返しのデータセットを評価することをお勧めします (tf.data.Dataset.repeat
を呼び出します) 。
または、定期的にチェックポイントを読み取って評価を実行するもう 1 つのタスクを作成することもできます。これは Estimator が行うことですが、推奨される評価方法ではないため、ここでは詳細については触れません。
パフォーマンス
マルチワーカートレーニングのパフォーマンスを調整するには、次を行うことができます。
tf.distribute.MultiWorkerMirroredStrategy
には複数の集合体通信実装が用意されています。RING
は、クロスホスト通信レイヤーとして、gRPC を使用したリング状の集合体を実装します。NCCL
は NVIDIA Collective Communication Library を使用して集合体を実装します。AUTO
は、選択をランタイムに任せます。
集合体の最適な実装は、GPU の数、GPU の種類、およびクラスタ内のネットワーク相互接続によって異なります。自動選択をオーバーライドするには、
MultiWorkerMirroredStrategy
のコンストラクタのcommunication_options
パラメータを以下のようにして指定します。communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
可能であれば、変数を
tf.float
にキャストします。- 公式の ResNet モデルには、どのようにしてこれを行うかの例が示されています。
フォールトトレランス
同期トレーニングでは、ワーカーが 1 つでも失敗し、障害復旧の仕組みが存在しない場合、クラスタは失敗します。
Keras をtf.distribute.Strategy
で使用する場合、ワーカーが停止した場合や不安定である際に、フォールトトラレンスが機能するというメリットがあります。この機能は、指定された分散ファイルシステムにトレーニングの状態を保存するため、失敗、または、中断されたインスタンスを再開する場合に、トレーニングの状態が復旧されます。
ワーカーが使用できなくなると、他のワーカーはエラーを発生します (おそらくタイムアウト後)。 このような場合、使用できないワーカー、およびエラーが発生した他のワーカーを再起動する必要があります。
注意: 以前は、ModelCheckpoint
コールバックには、マルチワーカートレーニングに失敗したジョブを再開したときに、トレーニングの状態を復元するメカニズムがありました。新たに導入される BackupAndRestore
コールバックでは、一貫したエクスペリエンスを提供するために、シングルワーカートレーニングにもこのサポートが追加され、既存の ModelCheckpoint
コールバックからフォールトトレランス機能が削除されました。今後、この動作に依存するアプリケーションは、新しい BackupAndRestore
コールバックに移行する必要があります。
ModelCheckpoint
コールバック
ModelCheckpoint
コールバックは、フォールトトレランス機能を提供しなくなりました。代わりに BackupAndRestore
コールバックを使用してください。
ModelCheckpoint
コールバックを使用してチェックポイントを保存することは、依然として可能です。ただし、これを使用する場合、トレーニングが中断されるか、問題なく終了した場合、チェックポイントからトレーニングを続行するには、手動でモデルを読み込まなければなりません。
オプションで、ユーザーは ModelCheckpoint
コールバックの外部でモデル/重みを保存および復元することを選択できます。
モデルの保存と読み込み
model.save
または tf.saved_model.save
を使用してモデルを保存するには、ワーカーごとに異なる保存先が必要となります。
- チーフワーカー以外のワーカーの場合、モデルを一時ディレクトリに保存する必要があります。
- チーフワーカーの場合、指定されたモデルのディレクトリに保存する必要があります。
ワーカーの一時ディレクトリは、複数のワーカーが同じ場所に書き込もうとしてエラーが発生しないように、一意のディレクトリである必要があります。
すべてのディレクトリに保存されるモデルは同一のものであり、復元やサービングで参照されるのは一般的に、チーフワーカーが保存したモデルです。
トレーニングが完了したらワーカーが作成した一時ディレクトリを削除するクリーンアップロジックを用意しておく必要があります。
チーフとワーカーを同時に保存する必要があるのは、チェックポイント中に変数を集計する可能性があり、チーフとワーカーの両方が allreduce 通信プロトコルに参加する必要があるためです。しかしながら、チーフとワーカーを同じモデルディレクトリに保存すると競合が発生し、エラーとなります。
MultiWorkerMirroredStrategy
を使用すると、プログラムはワーカーごとに実行され、現在のワーカーがチーフであるかを知る際には、task_type
と task_id
の属性があるクラスタレゾルバオブジェクトが利用されます。
task_type
から、現在のジョブが何であるか ('worker'
など) を知ることができます。task_id
から、ワーカーの ID を得られます。task_id == 0
のワーカーはチーフワーカーです。
以下のコードスニペットの write_filepath
関数は、書き込みのファイルパスを指定します。このパスはワーカーの task_id
によって異なります。
- チーフワーカー(
task_id == 0
)の場合は、元のファイルパスに書き込みます。 - それ以外のワーカーの場合は、書き込むディレクトリパスに
task_id
を指定して、一時ディレクトリ(temp_dir
)を作成します。
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configurations.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type` is `None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
これで、保存の準備ができました。
multi_worker_model.save(write_model_path)
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op, _update_step_xla while saving (showing 2 of 2). These functions will not be directly callable after loading. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
前述したように、後でモデルを読み込む場合、チーフが保存した場所にあるモデルのみを使用するべきなので、非チーフワーカーが保存した一時的なモデルは削除します。
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
読み込む際に便利な tf.keras.models.load_model
API を使用して、以降の作業に続けることにします。
ここでは、シングルワーカーのみを使用してトレーニングを読み込んで続けると仮定します。この場合、別の strategy.scope()
内で tf.keras.models.load_model
を呼び出しません (前に定義したように、strategy = tf.distribute.MultiWorkerMirroredStrategy()
です)。
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 7ms/step - loss: 2.2914 - accuracy: 0.2000 Epoch 2/2 20/20 [==============================] - 0s 7ms/step - loss: 2.2754 - accuracy: 0.2422 <keras.callbacks.History at 0x7fc2c09f9670>
チェックポイントの保存と復元
一方、チェックポイントを作成すれば、モデルの重みを保存し、モデル全体を保存せずともそれらを復元することが可能です。
ここでは、モデルをトラッキングする tf.train.Checkpoint
を 1 つ作成します。これは tf.train.CheckpointManager
によって管理されるため、最新のチェックポイントのみが保存されます。
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
CheckpointManager
の準備ができたら、チェックポイントを保存し、チーフ以外のワーカーが保存したチェックポイントを削除します。
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
これで、復元する必要があれば、便利なtf.train.latest_checkpoint
関数を使用して、保存された最新のチェックポイントを見つけることができるようになりました。チェックポイントが復元されると、トレーニングを続行することができます。
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-12-14 20:01:05.395997: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-12-14 20:01:05.654507: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 7ms/step - loss: 2.2941 - accuracy: 0.2281 Epoch 2/2 20/20 [==============================] - 0s 7ms/step - loss: 2.2783 - accuracy: 0.2648 <keras.callbacks.History at 0x7fc2c0a6c880>
BackupAndRestore
コールバック
tf.keras.callbacks.BackupAndRestore
コールバックはフォールトトレランス機能を提供します。この機能はモデルと現在のエポック番号を一時チェックポイントファイルに backup_dir
引数でバックアップし、BackupAndRestore
でコールバックします。
注意: Tensorflow 2.9 では、現在のモデルとトレーニング状態がエポック境界でバックアップされます。 tf-nightly
バージョンおよび TensorFlow 2.10 以降では、BackupAndRestore
コールバックはエポックまたはステップ境界でモデルとトレーニング状態をバックアップします。BackupAndRestore
は、オプションの save_freq
引数を受け入れます。save_freq
は、'epoch'
または int
値のいずれかを受け入れます。save_freq
が 'epoch'
に設定されている場合、モデルはエポックごとにバックアップされます。save_freq
が 0
より大きい整数値に設定されている場合、モデルは save_freq
バッチごとにバックアップされます。
ジョブが中断されて再開されると、BackupAndRestore
コールバックが最後のチェックポイントを復元し、トレーニング状態が最後に保存されたエポックとステップの最初からトレーニングを続行できます。
これを使用するには、Model.fit
呼び出し時に、 Model.fit
のインスタンスを指定します。
MultiWorkerMirroredStrategy
では、ワーカーが中断されると、そのワーカーが再開するまでクラスタ全体が一時停止されます。そのワーカーが再開すると他のワーカーも再開します。中断したワーカーがクラスタに参加し直すと、各ワーカーは以前に保存されたチェックポイントファイルを読み取って以前の状態を復元するため、クラスタの同期状態が戻ります。そして、トレーニングが続行されます。分散データセットの反復子の状態は再初期化され、復元されません。
The BackupAndRestore
callback uses the CheckpointManager
to save and restore the training state, which generates a file called checkpoint that tracks existing checkpoints together with the latest one. For this reason, backup_dir
should not be re-used to store other checkpoints in order to avoid name collision.
現在、BackupAndRestore
コールバックは、ストラテジーなしのシングルワーカートレーニング(MirroredStrategy
)と MultiWorkerMirroredStrategy
によるマルチワーカートレーニングをサポートしています。
Below are two examples for both multi-worker training and single-worker training:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback. The training state
# is backed up at epoch boundaries by default.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-12-14 20:01:08.689450: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 8ms/step - loss: 2.2696 - accuracy: 0.2268 Epoch 2/3 70/70 [==============================] - 1s 8ms/step - loss: 2.2096 - accuracy: 0.3013 Epoch 3/3 70/70 [==============================] - 1s 8ms/step - loss: 2.1317 - accuracy: 0.4317 <keras.callbacks.History at 0x7fc2be65ddc0>
BackupAndRestore
コールバックの save_freq
引数が 'epoch'
に設定されている場合、モデルはエポックごとにバックアップされます。
# The training state is backed up at epoch boundaries because `save_freq` is
# set to `epoch`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-12-14 20:01:13.452703: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 8ms/step - loss: 2.2786 - accuracy: 0.1719 Epoch 2/3 70/70 [==============================] - 1s 8ms/step - loss: 2.2316 - accuracy: 0.3955 Epoch 3/3 70/70 [==============================] - 1s 8ms/step - loss: 2.1827 - accuracy: 0.5406 <keras.callbacks.History at 0x7fc2c0b98f70>
注意: 次のコード ブロックでは、Tensorflow 2.10 がリリースされるまで tf-nightly
でのみ利用可能な機能を使用します。
BackupAndRestore
コールバックの save_freq
引数が 0
より大きい整数値に設定されている場合、モデルは save_freq
バッチごとにバックアップされます。
# The training state is backed up at every 30 steps because `save_freq` is set
# to an integer value of `30`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-12-14 20:01:18.217652: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 9ms/step - loss: 2.2709 - accuracy: 0.1958 Epoch 2/3 70/70 [==============================] - 1s 9ms/step - loss: 2.1935 - accuracy: 0.4922 Epoch 3/3 70/70 [==============================] - 1s 10ms/step - loss: 2.1001 - accuracy: 0.6498 <keras.callbacks.History at 0x7fc2be5221c0>
BackupAndRestore
に指定した backup_dir
のディレクトリを検査すると、一時的に生成されたチェックポイントファイルがいくつかあることに気づくでしょう。これらのファイルは、以前に失われたインスタンスの復元に必要なもので、トレーニングが正常に終了した時点で、Model.fit
の最後にライブラリによって削除されます。
注意: 現在、BackupAndRestore
コールバックは eager モードのみをサポートしています。グラフ モードでは、モデルの保存と復元に Model.save
/tf.saved_model.save
と tf.keras.models.load_model
を使用することを検討してください。それぞれ、上記のモデルの保存と読み込みセクションで説明されています。トレーニング中に Model.fit
で initial_epoch
を提供します。
追加リソース
- TensorFlow での分散型トレーニングガイドでは、利用可能な分散ストラテジーの概要が説明されています。
- Keras によるカスタムトレーニングループと MultiWorkerMirroredStrategy のチュートリアルでは、Keras とカスタムトレーニングループで
MultiWorkerMirroredStrategy
を使用する方法が説明されています。 - 公式モデルをご覧ください。この多くは、複数の分散ストラテジーを実行するように構成できます。
- tf.function を使ったパフォーマンスの改善ガイドでは、その他のストラテジーや、TensorFlow モデルのパフォーマンスを最適化するために使用できる TensorFlow Profiler といったツールに関する情報が提供されています。