TensorFlow.org에서 보기 | Google Colab에서 실행하기 | GitHub에서 소스 보기 | 노트북 다운로드하기 |
개요
이 튜토리얼에서는 tf.distribute.MultiWorkerMirroredStrategy API를 사용하여 tf.distribute.MultiWorkerMirroredStrategy
모델 및 Model.fit
API로 다중 작업자 분산 훈련을 수행하는 방법을 보여줍니다. 이 전략을 이용하면 단일 작업자에서 실행되도록 설계된 Keras 모델을 최소한의 코드 변경만으로 여러 작업자에서 원활하게 작동하도록 할 수 있습니다.
Keras 및 사용자 정의 훈련 루프와 함께 MultiWorkerMirroredStrategy
를 사용하는 방법에 대해 알아보려면 Keras 및 MultiWorkerMirroredStrategy를 이용한 사용자 정의 훈련 루프를 참조하세요.
이 튜토리얼에는 데모용으로 두 개의 작업자가 있는 최소 다중 작업자 예제가 포함되어 있습니다.
적절한 전략 선택
시작하기 전에 tf.distribute.MultiWorkerMirroredStrategy
가 해당 가속기와 훈련에 적합한 선택인지 확인하세요. 다음은 데이터 병렬 처리로 훈련을 배포하는 두 가지 일반적인 방법입니다.
- 첫 번째는
tf.distribute.MirroredStrategy
,tf.distribute.TPUStrategy
및tf.distribute.MultiWorkerMirroredStrategy
와 같이 훈련 단계가 작업자와 복제본 간에 동기화되는 동기식 훈련입니다. 모든 작업자는 동기화된 입력 데이터의 다른 조각에 대해 훈련하고 각 단계에서 그래디언트를 집계합니다. - 두 번째는
tf.distribute.experimental.ParameterServerStrategy
와 같이 훈련 단계가 엄격하게 동기화되지 않은 비동기 훈련입니다. 모든 작업자는 입력 데이터에 대해 독립적으로 훈련하고 변수를 비동기적으로 업데이트합니다.
TPU가 없는 다중 작업자 동기식 훈련을 찾고 있다면 tf.distribute.MultiWorkerMirroredStrategy
를 선택해야 합니다. 이는 모든 작업자에 걸쳐 각 장치의 모델 레이어에 있는 모든 변수의 복사본을 만들고, 집합적 통신을 위한 TensorFlow 연산인 CollectiveOps
를 사용하여 그래디언트를 집계하고 변수를 동기화 상태로 유지합니다. 관심 있는 분들은 집합적 구현 옵션에 대한 tf.distribute.experimental.CommunicationOptions
매개변수를 확인해 보세요.
tf.distribute.Strategy
API에 대한 개요는 TensorFlow의 분산 훈련을 참조하세요.
설정
몇 가지 필요한 가져오기로 시작합니다.
import json
import os
import sys
TensorFlow를 가져오기 전에 환경을 일부 변경합니다.
- 실제 적용 상황에서 각 작업자는 다른 시스템에 있을 것입니다. 이 튜토리얼의 목적상 모든 작업자는 이 시스템에서 실행됩니다. 따라서 모든 작업자가 동일한 GPU를 사용하려고 하여 오류가 발생하지 않도록 모든 GPU를 비활성화합니다.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
TF_CONFIG
환경 변수를 재설정합니다(나중에 자세히 알아볼 것입니다).
os.environ.pop('TF_CONFIG', None)
- 현재 디렉터리가 Python의 경로에 있도록 합니다. 그러면 나중에
%%writefile
이 쓴 파일을 노트북이 가져올 수 있습니다.
if '.' not in sys.path:
sys.path.insert(0, '.')
tf.keras.callbacks.BackupAndRestore
의 save_freq
인수를 사용하여 특정 단계에서 체크포인트의 빈도를 저장하는 기능이 TensorFlow 2.10부터 도입되었으므로 tf-nightly
를 설치합니다.
pip install tf-nightly
마지막으로 TensorFlow를 가져옵니다.
import tensorflow as tf
2022-12-15 01:43:39.072372: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay
데이터세트 및 모델 정의
다음으로, 간단한 모델 및 데이터세트 설정으로 mnist_setup.py
파일을 만듭니다. 이 Python 파일은 이 튜토리얼에서 작업자 프로세스에 사용됩니다.
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
단일 작업자에 대한 모델 훈련
먼저 몇 개의 epoch에 대해 모델을 훈련하고 단일 작업자의 결과를 관찰하여 이상이 없는지 확인합니다. 훈련이 진행됨에 따라 손실은 감소하고 정확도는 증가합니다.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
2022-12-15 01:43:42.188844: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 7ms/step - loss: 2.2791 - accuracy: 0.1067 Epoch 2/3 70/70 [==============================] - 0s 7ms/step - loss: 2.2241 - accuracy: 0.3194 Epoch 3/3 70/70 [==============================] - 0s 7ms/step - loss: 2.1610 - accuracy: 0.4893 <keras.callbacks.History at 0x7f2717617160>
다중 작업자 구성
이제 다중 작업자 훈련의 세계로 들어가 보겠습니다.
작업 및 태스크가 있는 클러스터
TensorFlow에서 분산 훈련에는 여러 작업이 있는 'cluster'
가 포함되며 각 작업에는 하나 이상의 'task'
가 있을 수 있습니다.
각각의 역할이 다를 수 있는 여러 머신에서의 훈련을 위해 TF_CONFIG
구성 환경 변수가 필요합니다. TF_CONFIG
는 클러스터의 일부인 각 작업자에 대한 클러스터 구성을 지정하는 데 사용되는 JSON 문자열입니다.
TF_CONFIG
변수에는 'cluster'
및 'task'
의 두 가지 구성요소가 있습니다.
'cluster'
는 모든 작업자에 대해 동일하며'worker'
또는'chief'
와 같은 다양한 유형의 작업으로 구성된 사전인 훈련 클러스터에 대한 정보를 제공합니다.tf.distribute.MultiWorkerMirroredStrategy
를 이용한 다중 작업자 훈련의 경우, 일반적으로 하나의'worker'
가 있으며, 이 작업자가 정규'worker'
가 수행하는 작업과 더불어 체크포인트를 저장하고 TensorBoard에 대한 요약 파일을 작성하는 등의 작업을 책임집니다. 이러한'worker'
를 책임 작업자라고 합니다(작업 이름은'chief'
).'index'
0
인 작업자가'chief'
가 되는 것이 일반적입니다.
'task'
는 현재 작업에 대한 정보를 제공하며 작업자마다 다릅니다. 이를 통해 해당 작업자의'type'
과'index'
가 지정됩니다.
다음은 구성의 예입니다.
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
tf_config
는 단순히 Python의 지역 변수입니다. 훈련 구성에 사용하려면 JSON으로 직렬화하고 TF_CONFIG
환경 변수에 배치합니다.
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
위의 구성 예에서는 'type'
작업을 'worker'
로 설정하고 'index'
작업을 0
으로 설정했습니다. 따라서 이 시스템는 첫 번째 작업자이며, 'chief'
작업자로 지정됩니다.
참고: 다른 시스템에도 TF_CONFIG
환경 변수가 설정되어 있어야 하며 동일한 'cluster'
사전이 있어야 하지만 해당 시스템의 역할에 따라 다른 작업 'type'
또는 작업 'index'
를 갖습니다.
실제로 외부 IP 주소/포트에 여러 작업자를 만들고 그에 따라 각 작업자에 TF_CONFIG
변수를 설정합니다. 설명을 위해 이 튜토리얼에서는 localhost
에 두 작업자로 TF_CONFIG
변수를 설정하는 방법을 보여줍니다.
- 위와 같이 첫 번째(
'chief'
) 작업자의TF_CONFIG
- 두 번째 작업자의 경우
tf_config['task']['index']=1
을 설정합니다.
노트북의 환경 변수 및 하위 프로세스
하위 프로세스는 상위 요소로부터 환경 변수를 상속합니다. 따라서 이 Jupyter Notebook 프로세스에서 다음과 같이 환경 변수를 설정하는 경우:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
... 하위 프로세스에서 환경 변수에 액세스할 수 있습니다.
echo ${GREETINGS}
Hello TensorFlow!
다음 섹션에서는 이 메서드를 사용하여 TF_CONFIG
를 작업자 하위 프로세스에 전달합니다. 실제 시나리오에서는 이런 식으로 작업을 시작하지 않습니다. 이 튜토리얼은 단순히 최소한의 다중 작업자 예제로 작업을 수행하는 방법을 보여줍니다.
모델 훈련하기
모델을 훈련하려면 먼저 tf.distribute.MultiWorkerMirroredStrategy
의 인스턴스를 만듭니다.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
참고: MultiWorkerMirroredStrategy.init()
가 호출될 경우, TF_CONFIG
는 파싱되며 TensorFlow GRPC 서버는 구동됩니다. 따라서 tf.distribute.Strategy
인스턴스 생성 전에 TF_CONFIG
환경변수가 설정되어야 합니다. TF_CONFIG
가 아직 설정되지 않았으므로 상기 전략은 사실상 단일 워커 훈련입니다.
tf.distribute.Strategy
API를 tf.keras
에 통합하면 여러 작업자에게 훈련을 분배하기 위해 유일하게 수행하는 변경은 모델 구축 및 model.compile()
호출을 strategy.scope()
로 감싸는 것입니다. 분산 전략의 범위에 따라 변수가 생성되는 방식과 위치가 결정되고 MultiWorkerMirroredStrategy
의 경우, 생성된 변수는 MirroredVariable
이며 각 작업자에 복제됩니다.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
참고: 현재, MultiWorkerMirroredStrategy
에는 전략 인스턴스가 생성된 후 TensorFlow ops를 생성해야 하는 제한이 있습니다. RuntimeError: Collective ops must be configured at program startup
가 발생하면 프로그램 맨 처음에 MultiWorkerMirroredStrategy
인스턴스를 만들고 전략이 인스턴스화된 후에 ops를 생성할 수 있는 코드를 넣으세요.
실제로 MultiWorkerMirroredStrategy
로 실행하려면 작업자 프로세스를 실행하고 TF_CONFIG
를 전달해야 합니다.
앞서 작성한 mnist_setup.py
파일과 마찬가지로 다음은 각 작업자가 실행할 main.py
입니다.
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
위의 코드 스니펫에서 Dataset.batch
로 전달되는 global_batch_size
가 per_worker_batch_size * num_workers
로 설정된다는 점에 주목하세요. 그러면 각 작업자가 작업자 수에 관계없이 per_worker_batch_size
예제 배치를 처리하게 됩니다.
현재 디렉터리에는 이제 두 Python 파일이 모두 포함됩니다.
ls *.py
main.py mnist_setup.py
TF_CONFIG
를 JSON으로 직렬화하고 환경 변수에 추가합니다.
os.environ['TF_CONFIG'] = json.dumps(tf_config)
이제, main.py
를 실행하고 TF_CONFIG
를 사용하는 작업자 프로세스를 시작할 수 있습니다.
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
위의 명령에서 몇 가지 주의할 사항이 있습니다.
- 일부 bash 명령을 실행하기 위해 노트북 "매직"인
%%bash
가 사용됩니다. --bg
플래그를 사용하여 백그라운드에서bash
프로세스를 실행합니다. 이 작업자는 종료되지 않기 때문입니다. 시작하기 전에 모든 작업자를 기다립니다.
백그라운드 작업자 프로세스는 이 노트북에 출력을 인쇄하지 않으므로 &>
는 출력을 파일로 리디렉션하고 나중에 로그 파일에서 발생한 상황을 검사할 수 있습니다.
따라서 프로세스가 시작될 때까지 몇 초 동안 기다리세요.
import time
time.sleep(10)
이제, 지금까지 작업자의 로그 파일에 출력된 내용을 검사합니다.
cat job_0.log
2022-12-15 01:43:45.590694: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay 2022-12-15 01:43:47.627300: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
로그 파일의 마지막 줄은 다음과 같아야 합니다: Started server with target: grpc://localhost:12345
. 이제 첫 번째 작업자가 준비되었으며 다른 모든 작업자가 계속 진행할 준비가 되기를 기다립니다.
따라서 두 번째 작업자 프로세스가 시작되도록 tf_config
를 업데이트합니다.
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
두 번째 작업자를 시작합니다. 모든 작업자가 활성 상태이므로 훈련이 시작됩니다(따라서 이 프로세스를 백그라운드로 처리할 필요가 없음).
python main.py
2022-12-15 01:43:55.790767: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay 2022-12-15 01:43:57.912229: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-12-15 01:43:59.021660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-12-15 01:43:59.325746: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 44ms/step - loss: 2.2751 - accuracy: 0.1901 Epoch 2/3 70/70 [==============================] - 3s 44ms/step - loss: 2.2069 - accuracy: 0.4153 Epoch 3/3 70/70 [==============================] - 3s 44ms/step - loss: 2.1303 - accuracy: 0.5448
첫 번째 작업자가 작성한 로그를 다시 확인하면 작업자가 해당 모델 훈련에 참여했음을 알 수 있습니다.
cat job_0.log
2022-12-15 01:43:45.590694: E tensorflow/tsl/lib/monitoring/collection_registry.cc:81] Cannot register 2 metrics with the same name: /tensorflow/core/bfc_allocator_delay 2022-12-15 01:43:47.627300: E tensorflow/compiler/xla/stream_executor/cuda/cuda_driver.cc:267] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-12-15 01:43:59.020150: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-12-15 01:43:59.325986: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 44ms/step - loss: 2.2751 - accuracy: 0.1901 Epoch 2/3 70/70 [==============================] - 3s 44ms/step - loss: 2.2069 - accuracy: 0.4153 Epoch 3/3 70/70 [==============================] - 3s 44ms/step - loss: 2.1303 - accuracy: 0.5448
참고: 단일 시스템에서 여러 작업자를 실행하면 오버헤드만 추가되기 때문에 이 튜토리얼의 시작 부분에서 실행한 테스트보다 느리게 실행될 수 있습니다. 여기서 목표는 훈련 시간을 개선하는 것이 아니라 다중 작업자 훈련의 예를 제공하는 것입니다.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
심층적 다중 작업자 훈련
지금까지 기본적인 다중 작업자 설정을 수행하는 방법을 배웠습니다. 튜토리얼의 나머지 부분에서는 실제 사용 사례에 유용하거나 중요할 수 있는 다른 요소에 대해 자세히 설명합니다.
데이터세트 샤딩
다중 작업자 훈련 중 수렴과 성능을 보장하려면 데이터 샤딩이 필요합니다.
이전 섹션의 예는 tf.distribute.Strategy
API에서 제공하는 기본 자동 샤딩을 이용합니다. tf.data.experimental.DistributeOptions
의 tf.data.experimental.AutoShardPolicy
를 설정하여 샤딩을 제어할 수 있습니다.
자동 샤딩에 대한 자세한 내용은 분산 입력 가이드를 참조하세요.
다음은 각 복제본이 모든 예를 처리하도록 자동 샤딩을 해제하는 방법을 보여주는 간단한 예입니다(권장하지 않음).
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
평가
validation_data
를 Model.fit
에도 전달하면 각 epoch에 대해 훈련과 평가가 번갈아 이루어집니다. 평가 작업은 동일한 작업자 집합에 배포되고 그 결과가 집계되어 모든 작업자가 사용할 수 있습니다.
훈련과 마찬가지로 검증 데이터세트는 파일 수준에서 자동으로 샤딩됩니다. 검증 데이터세트에서 전역 배치 크기를 설정하고 validation_steps
를 설정해야 합니다.
평가를 위해 반복되는 데이터세트(tf.data.Dataset.repeat
호출)를 권장합니다.
또는 주기적으로 체크포인트를 읽고 평가를 실행하는 다른 작업을 생성할 수도 있습니다. 이것이 Estimator가 하는 일입니다. 그러나 이는 권장되는 평가 방법이 아니므로 자세한 내용은 생략합니다.
공연
다중 작업자 훈련의 성능을 조정하기 위해 다음을 시도할 수 있습니다.
tf.distribute.MultiWorkerMirroredStrategy
는 다수의 집합 통신 구현을 제공합니다.RING
은 호스트 사이의 통신 레이어로 gRPC를 사용하여 링 기반 집합체를 구현합니다.NCCL
은 NVIDIA 집합 통신 라이브러리를 사용하여 집합체를 구현합니다.AUTO
는 선택을 런타임으로 미룹니다.
최선의 집합체 구현 선택은 클러스터의 GPU 수, GPU 유형 및 네트워크 상호 연결에 따라 다릅니다. 자동 선택을 재정의하려면
MultiWorkerMirroredStrategy
생성자의communication_options
매개변수를 지정합니다. 예를 들면 다음과 같습니다.communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CommunicationImplementation.NCCL)
가능한 경우 변수를
tf.float
로 캐스팅합니다.- 공식 ResNet 모델에는 이를 수행하는 방법의 예가 포함되어 있습니다.
내결함성
동기식 훈련에서 작업자 중 하나가 실패하고 실패 복구 메커니즘이 없으면 클러스터가 실패합니다.
tf.distribute.Strategy
와 함께 Keras를 사용하면 작업자가 작동하지 않거나 불안정한 경우 내결함성의 이점이 있습니다. 이를 위해 이전에 실패했거나 선점한 인스턴스를 다시 시작할 때 훈련 상태가 복구되도록 선택한 분산 파일 시스템에서 훈련 상태를 유지할 수 있습니다.
작업자를 사용할 수 없게 되면 다른 작업자가 실패합니다(시간 초과 후). 이러한 경우 사용할 수 없는 작업자는 물론 실패한 다른 작업자도 다시 시작해야 합니다.
참고: 이전에는 다중 작업자 훈련 작업 중 장애로 인해 훈련을 재시작할 경우 ModelCheckpoint
콜백이 훈련 상태를 복구하는 메커니즘을 제공했습니다. TensorFlow 팀은 일관성 있는 경험을 위해 단일 작업자 훈련 지원을 추가로 제공하고자 BackupAndRestore
콜백을 새롭게 도입하였으며, 기존 ModelCheckpoint
콜백의 내결함성 기능을 제거했습니다. 이제 해당 동작을 사용하는 애플리케이션은 새 BackupAndRestore
콜백으로 마이그레이션해야 합니다.
ModelCheckpoint
콜백
ModelCheckpoint
콜백은 더 이상 내결함성 기능을 제공하지 않습니다. 대신 BackupAndRestore
콜백을 사용하세요.
ModelCheckpoint
콜백은 여전히 체크포인트를 저장하는 데 사용할 수 있습니다. 하지만 이를 통해 훈련이 중단되거나 성공적으로 종료된 경우 체크포인트에서 훈련을 계속하기 위해 사용자가 수동으로 모델을 로드해야 합니다.
필요한 경우, ModelCheckpoint
콜백 외부에서 모델/가중치를 저장하고 복원하도록 선택할 수 있습니다.
모델 저장 및 로드하기
model.save
또는 tf.saved_model.save
를 사용하여 모델을 저장하려면 작업자마다 저장 대상이 달라야 합니다.
- 수석 작업자가 아닌 경우 모델을 임시 디렉터리에 저장해야 합니다.
- 수석 작업자의 경우 제공된 모델 디렉터리에 저장해야 합니다.
여러 작업자가 동일한 위치에 쓰려고 하여 오류가 발생하는 것을 방지하기 위해 작업자의 임시 디렉터리는 고유해야 합니다.
모든 디렉터리에 저장되는 모델은 동일하며, 일반적으로 수석 작업자가 저장한 모델만 복원 또는 제공을 위해 참조해야 합니다.
훈련이 완료되면 작업자가 만든 임시 디렉터리를 삭제하는 정리 논리가 있어야 합니다.
수석 작업자와 작업자에서 동시에 저장하는 이유는 체크포인트 동안 변수를 집계해야 할 수 있고, 이를 위해 수석 작업자와 작업자가 모두 allreduce 통신 프로토콜에 참여해야 하기 때문입니다. 반면, 수석 작업자와 작업자를 같은 모델 디렉터리에 저장하도록 하면 경합으로 인해 오류가 발생합니다.
MultiWorkerMirroredStrategy
를 사용하면 프로그램이 모든 작업자에서 실행되고 현재 작업자가 책임 작업자인지 여부를 알기 위해 task_type
및 task_id
특성을 갖는 클러스터 확인자 객체가 이용됩니다.
task_type
은 현재 작업이 무엇인지 알려줍니다(예:'worker'
).task_id
는 작업자의 식별자를 알려줍니다.task_id == 0
인 작업자가 수석 작업자로 지정됩니다.
아래 코드 스니펫에서 write_filepath
함수는 작업자의 task_id
에 따라 쓸 파일 경로를 제공합니다.
- 수석 작업자(
task_id == 0
)의 경우 원래 파일 경로에 씁니다. - 다른 작업자의 경우 쓸 디렉터리 경로에
task_id
가 있는 임시 디렉터리인temp_dir
가 만들어집니다.
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configurations.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type` is `None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
이제 저장할 준비가 되었습니다.
multi_worker_model.save(write_model_path)
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op, _update_step_xla while saving (showing 2 of 2). These functions will not be directly callable after loading. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
위에서 설명한 것처럼 나중에 모델은 책임 작업자가 저장한 파일 경로에서만 로드되어야 됩니다. 따라서 비 책임 작업자가 저장한 임시 항목을 제거하세요.
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
이제 로드할 때 편리한 tf.keras.models.load_model
API를 사용하여 추가 작업을 계속합니다.
여기에서는 단일 작업자만 사용하여 훈련을 로드하고 계속한다고 가정합니다. 이 경우 다른 strategy.scope()
내에서 tf.keras.models.load_model
을 호출하지 않습니다(앞서 정의한 바와 같이 strategy = tf.distribute.MultiWorkerMirroredStrategy()
임에 유의):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 8ms/step - loss: 2.3017 - accuracy: 0.0852 Epoch 2/2 20/20 [==============================] - 0s 7ms/step - loss: 2.2802 - accuracy: 0.1797 <keras.callbacks.History at 0x7f2717617a60>
체크포인트 저장 및 복원
한편, 체크포인트를 사용하면 전체 모델을 저장하지 않고도 모델의 가중치를 저장하고 복원할 수 있습니다.
여기에서는 모델을 추적하는 하나의 tf.train.Checkpoint
를 만들고, 최신 체크포인트만 보존되도록 이를 tf.train.CheckpointManager
로 관리합니다.
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
CheckpointManager
가 설정되면 이제 수석이 아닌 작업자가 저장한 체크포인트를 저장하고 제거할 준비가 된 것입니다.
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
이제 모델을 복원해야 할 때 편리한 tf.train.latest_checkpoint
함수를 사용하여 저장된 최신 체크포인트를 찾을 수 있습니다. 체크포인트를 복원한 후 훈련을 계속할 수 있습니다.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-12-15 01:44:15.096852: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-12-15 01:44:15.387474: W tensorflow/core/framework/dataset.cc:807] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 7ms/step - loss: 2.2985 - accuracy: 0.0914 Epoch 2/2 20/20 [==============================] - 0s 7ms/step - loss: 2.2800 - accuracy: 0.1586 <keras.callbacks.History at 0x7f280e3bde50>
BackupAndRestore
콜백
tf.keras.callbacks.BackupAndRestore
콜백은 BackupAndRestore
에 대한 backup_dir
인수 아래에 임시 체크포인트 파일의 모델 및 현재 훈련 상태를 백업하여 내결함성 기능을 제공합니다.
참고: Tensorflow 2.9에서는 현재 모델과 훈련 상태가 epoch 경계에서 백업됩니다. tf-nightly
버전과 TensorFlow 2.10부터 BackupAndRestore
콜백은 epoch 또는 단계 경계에서 모델과 훈련 상태를 백업할 수 있습니다. BackupAndRestore
는 선택적 save_freq
인수를 허용합니다. save_freq
는 'epoch'
또는 int
값을 허용합니다. save_freq
가 'epoch'
로 설정되면 모델은 epoch마다 백업됩니다. save_freq
가 0
보다 큰 정수 값으로 설정되면 save_freq
개의 배치마다 모델이 백업됩니다.
작업이 중단되고 다시 시작되면 BackupAndRestore
콜백이 마지막 체크포인트를 복원하고 훈련 상태가 마지막으로 저장된 epoch 및 단계의 시작부터 훈련을 계속할 수 있습니다.
이를 사용하려면 Model.fit
호출에서 tf.keras.callbacks.BackupAndRestore
의 인스턴스를 제공하세요.
BackupAndRestore
콜백은 CheckpointManager
를 사용하여 훈련 상태를 저장하고 복원하는데, 이 때 최신 체크포인트와 함께 기존 체크포인트를 추적하는 체크포인트라는 파일이 생성됩니다. 이러한 이유로 이름 충돌을 피하기 위해 backup_dir
는 다른 체크포인트를 저장하는 데 재사용되지 않아야 합니다.
현재 BackupAndRestore
콜백은 전략이 없는 단일 작업자 훈련(MirroredStrategy
)과 MultiWorkerMirroredStrategy
를 사용하는 다중 작업자 훈련을 지원합니다.
현재 BackupAndRestore
콜백은 전략이 없는 단일 작업자 훈련(MirroredStrategy
)과 MultiWorkerMirroredStrategy
를 사용하는 다중 작업자 훈련을 지원합니다.
다음은 다중 작업자 훈련 및 단일 작업자 훈련에 대한 두 가지 예제입니다.
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback. The training state
# is backed up at epoch boundaries by default.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-12-15 01:44:18.504579: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 9ms/step - loss: 2.2887 - accuracy: 0.1348 Epoch 2/3 70/70 [==============================] - 1s 9ms/step - loss: 2.2458 - accuracy: 0.3402 Epoch 3/3 70/70 [==============================] - 1s 9ms/step - loss: 2.1951 - accuracy: 0.4759 <keras.callbacks.History at 0x7f280f4309a0>
BackupAndRestore
콜백의 save_freq
인수가 'epoch'
로 설정된 경우 모델은 epoch마다 백업됩니다.
# The training state is backed up at epoch boundaries because `save_freq` is
# set to `epoch`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-12-15 01:44:23.521369: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 9ms/step - loss: 2.2788 - accuracy: 0.1493 Epoch 2/3 70/70 [==============================] - 1s 9ms/step - loss: 2.2026 - accuracy: 0.3723 Epoch 3/3 70/70 [==============================] - 1s 9ms/step - loss: 2.1214 - accuracy: 0.5451 <keras.callbacks.History at 0x7f280eb3adc0>
참고: 다음 코드 블록은 Tensorflow 2.10이 출시될 때까지 tf-nightly
에서만 사용할 수 있는 기능을 사용합니다.
BackupAndRestore
콜백의 save_freq
인수가 0
보다 큰 정수 값으로 설정되면 save_freq
의 배치마다 모델이 백업됩니다.
# The training state is backed up at every 30 steps because `save_freq` is set
# to an integer value of `30`.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup', save_freq=30)]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-12-15 01:44:28.537486: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:784] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } attr { key: "replicate_on_split" value { b: false } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 10ms/step - loss: 2.2870 - accuracy: 0.1674 Epoch 2/3 70/70 [==============================] - 1s 10ms/step - loss: 2.2313 - accuracy: 0.2991 Epoch 3/3 70/70 [==============================] - 1s 11ms/step - loss: 2.1604 - accuracy: 0.4556 <keras.callbacks.History at 0x7f280ea3ec40>
BackupAndRestore
에서 지정한 backup_dir
의 디렉터리를 살펴보면 일시적으로 생성된 일부 체크포인트 파일을 발견할 수 있습니다. 이러한 파일은 이전에 손실된 인스턴스를 복구하는 데 필요하며 훈련이 성공적으로 종료되면 Model.fit
마지막에 라이브러리에서 제거됩니다.
참고: 현재, BackupAndRestore
콜백은 강제 실행 모드만 지원합니다. 그래프 모드에서 위의 모델 저장 및 로드 섹션에서 설명한 대로 Model.save
/tf.saved_model.save
및 tf.keras.models.load_model
을 각각 모델 저장과 복원에 사용하고 훈련 중 Model.fit
에 initial_epoch
를 제공하는 방법을 고려해 보세요.
추가 자료
- TensorFlow에서 분산 훈련하기 가이드는 사용 가능한 분산 전략을 간략히 소개합니다.
- Keras 및 MultiWorkerMirroredStrategy를 이용한 사용자 지정 훈련 루프 튜토리얼은 Keras 및 사용자 지정 훈련 루프와 함께
MultiWorkerMirroredStrategy
를 이용하는 방법을 보여줍니다. - 여러 분산 전략을 실행하도록 구성할 수 있는 공식 모델을 확인해 보세요.
- tf.function으로 성능 향상 가이드는 TensorFlow 모델의 성능을 최적화하는 데 사용할 수 있는 TensorFlow 프로파일러와 같은 다른 전략 및 도구에 대한 정보를 제공합니다.