TensorFlow.org'da görüntüleyin | Google Colab'da çalıştırın | Kaynağı GitHub'da görüntüleyin | Not defterini indir |
genel bakış
Bu öğretici, tf.distribute.Strategy
API'sini (özellikle tf.distribute.MultiWorkerMirroredStrategy
sınıfını) kullanarak bir Keras modeli ve Model.fit
API'si ile çok çalışanlı dağıtılmış eğitimin nasıl gerçekleştirileceğini gösterir. Bu stratejinin yardımıyla, tek bir çalışan üzerinde çalışacak şekilde tasarlanmış bir Keras modeli, minimum kod değişikliği ile birden çok çalışan üzerinde sorunsuz bir şekilde çalışabilir.
tf.distribute.Strategy
API'lerini daha derinlemesine anlamak isteyenler için, TensorFlow'un desteklediği dağıtım stratejilerine genel bir bakış için TensorFlow kılavuzundaki Dağıtılmış eğitim mevcuttur.
MultiWorkerMirroredStrategy
ile MultiWorkerMirroredStrategy ve özel eğitim döngüsünün nasıl kullanılacağını öğrenmek için Keras ve MultiWorkerMirrorredStrategy ile Özel eğitim döngüsüne bakın.
Bu öğreticinin amacının, iki işçi ile minimal çok işçili bir örnek göstermek olduğunu unutmayın.
Kurmak
Bazı gerekli ithalatlarla başlayın:
import json
import os
import sys
TensorFlow'u içe aktarmadan önce ortamda birkaç değişiklik yapın:
- Tüm GPU'ları devre dışı bırakın. Bu, aynı GPU'yu kullanmaya çalışan çalışanların neden olduğu hataları önler. Gerçek dünyadaki bir uygulamada, her işçi farklı bir makinede olacaktır.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
-
TF_CONFIG
ortam değişkenini sıfırlayın (bunun hakkında daha fazla bilgi edineceksiniz):
os.environ.pop('TF_CONFIG', None)
- Geçerli dizinin Python'un yolunda olduğundan emin olun; bu, not defterinin daha sonra
%%writefile
tarafından yazılan dosyaları içe aktarmasına olanak tanır:
if '.' not in sys.path:
sys.path.insert(0, '.')
Şimdi TensorFlow'u içe aktarın:
import tensorflow as tf
Veri kümesi ve model tanımı
Ardından, basit bir model ve veri kümesi kurulumuyla bir mnist_setup.py
dosyası oluşturun. Bu Python dosyası, bu öğreticideki çalışan işlemler tarafından kullanılacaktır:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
tutucu6 l10n-yerWriting mnist_setup.py
Tek bir işçi üzerinde model eğitimi
Modeli az sayıda dönem için eğitmeyi deneyin ve her şeyin doğru çalıştığından emin olmak için tek bir çalışanın sonuçlarını gözlemleyin. Eğitim ilerledikçe, kayıp düşmeli ve doğruluk artmalıdır.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
tutucu8 l10n-yerDownloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
Çok çalışanlı yapılandırma
Şimdi çok işçili eğitim dünyasına girelim.
İşler ve görevler içeren bir küme
TensorFlow'da dağıtılmış eğitim şunları içerir: birkaç işi olan bir 'cluster'
ve işlerin her birinin bir veya daha fazla 'task'
olabilir.
Her biri muhtemelen farklı bir role sahip olan birden çok makinede eğitim için TF_CONFIG
yapılandırma ortamı değişkenine ihtiyacınız olacak. TF_CONFIG
, kümenin parçası olan her çalışan için küme yapılandırmasını belirtmek için kullanılan bir JSON dizesidir.
Bir TF_CONFIG
değişkeninin iki bileşeni vardır: 'cluster'
ve 'task'
.
Bir
'cluster'
tüm işçiler için aynıdır ve'worker'
veya'chief'
gibi farklı iş türlerinden oluşan bir dikte olan eğitim kümesi hakkında bilgi sağlar.-
tf.distribute.MultiWorkerMirroredStrategy
ile çok çalışanlı eğitimde, normal bir'worker'
işçi'nin yaptıklarına ek olarak, bir kontrol noktası kaydetme ve TensorBoard için bir özet dosyası yazma gibi sorumlulukları üstlenen genellikle bir'worker'
vardır. Bu tür'worker'
, baş işçi olarak adlandırılır ('chief'
bir iş adıyla). -
'chief'
için'index'
0
atanması alışılmış bir durumdur (aslında,tf.distribute.Strategy
bu şekilde uygulanır).
-
Bir
'task'
mevcut görev hakkında bilgi sağlar ve her çalışan için farklıdır. Bu çalışanın'type'
ve'index'
belirtir.
Aşağıda örnek bir yapılandırma verilmiştir:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
İşte bir JSON dizesi olarak serileştirilmiş aynı TF_CONFIG
:
json.dumps(tf_config)
tutucu11 l10n-yer'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
tf_config
yalnızca yerel bir değişken olduğunu unutmayın. Bir eğitim yapılandırması için kullanabilmek için, bu dict'in bir JSON olarak serileştirilmesi ve bir TF_CONFIG
ortam değişkenine yerleştirilmesi gerekir.
Yukarıdaki örnek yapılandırmada, 'type'
görevini 'worker'
olarak ve 'index'
görevini 0
olarak ayarladınız. Bu nedenle, bu makine ilk işçidir. 'chief'
işçi olarak atanacak ve diğerlerinden daha fazla iş yapacak.
Örnekleme amacıyla, bu öğretici, bir localhost
üzerinde iki çalışanla bir TF_CONFIG
değişkenini nasıl kurabileceğinizi gösterir.
Pratikte, harici IP adreslerinde/portlarında birden çok işçi oluşturacak ve buna göre her bir çalışan için bir TF_CONFIG
değişkeni ayarlayacaksınız.
Bu öğreticide iki işçi kullanacaksınız:
- İlk (
'chief'
) işçininTF_CONFIG
yukarıda gösterilmiştir. - İkinci çalışan için
tf_config['task']['index']=1
değerini ayarlayacaksınız
Not defterlerinde ortam değişkenleri ve alt süreçler
Alt işlemler, ortam değişkenlerini üstlerinden devralır.
Örneğin, bu Jupyter Notebook işleminde bir ortam değişkenini aşağıdaki gibi ayarlayabilirsiniz:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Ardından, ortam değişkenine bir alt işlemden erişebilirsiniz:
echo ${GREETINGS}
tutucu14 l10n-yerHello TensorFlow!
Sonraki bölümde, TF_CONFIG
çalışan alt işlemlere geçirmek için benzer bir yöntem kullanacaksınız. Gerçek dünya senaryosunda, işlerinizi bu şekilde başlatmazsınız, ancak bu örnekte bu yeterlidir.
Doğru stratejiyi seçin
TensorFlow'da dağıtılmış eğitimin iki ana biçimi vardır:
- Eğitim adımlarının çalışanlar ve kopyalar arasında eşitlendiği eşzamanlı eğitim ve
- Eğitim adımlarının kesin olarak eşitlenmediği zaman uyumsuz eğitim (örneğin, parametre sunucusu eğitimi ).
Bu öğretici, tf.distribute.MultiWorkerMirroredStrategy
örneğini kullanarak eşzamanlı çok çalışan eğitiminin nasıl gerçekleştirileceğini gösterir.
MultiWorkerMirroredStrategy
, tüm çalışanlarda her cihazda modelin katmanlarındaki tüm değişkenlerin kopyalarını oluşturur. Degradeleri toplamak ve değişkenleri senkronize tutmak için toplu iletişim için bir TensorFlow op olan CollectiveOps
kullanır. tf.distribute.Strategy
kılavuzunda bu strateji hakkında daha fazla ayrıntı bulunmaktadır.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
tutucu16 l10n-yerWARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
, tf.distribute.experimental.CommunicationOptions
parametresi aracılığıyla birden çok uygulama sağlar: 1) RING
, ana bilgisayarlar arası iletişim katmanı olarak gRPC kullanarak halka tabanlı kolektifleri uygular; 2) NCCL
, kolektifleri uygulamak için NVIDIA Toplu İletişim Kitaplığını kullanır; ve 3) AUTO
, seçimi çalışma zamanına erteler. Toplu uygulamanın en iyi seçimi, GPU'ların sayısına ve türüne ve kümedeki ağ ara bağlantısına bağlıdır.
Modeli eğit
tf.distribute.Strategy
API'sinin tf.keras
entegrasyonuyla, eğitimi birden çok çalışana dağıtmak için yapacağınız tek değişiklik, model oluşturma ve model.compile()
çağrısını strategy.scope()
içine dahil etmektir. Dağıtım stratejisinin kapsamı, değişkenlerin nasıl ve nerede oluşturulacağını belirler ve MultiWorkerMirroredStrategy
durumunda, oluşturulan değişkenler MirroredVariable
'lardır ve her bir çalışan üzerinde çoğaltılırlar.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
Gerçekten MultiWorkerMirroredStrategy
ile çalışmak için çalışan süreçleri çalıştırmanız ve onlara bir TF_CONFIG
gerekir.
Daha önce yazılan mnist_setup.py
dosyası gibi, her bir çalışanın çalıştıracağı main.py
dosyası şu şekildedir:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
tutucu19 l10n-yerWriting main.py
Yukarıdaki kod parçacığında, global_batch_size
geçirilen Dataset.batch
per_worker_batch_size * num_workers
olarak ayarlandığını unutmayın. Bu, çalışan sayısından bağımsız olarak her çalışanın per_worker_batch_size
örneklerini toplu olarak işlemesini sağlar.
Geçerli dizin artık her iki Python dosyasını da içeriyor:
ls *.py
tutucu21 l10n-yermain.py mnist_setup.py
Böylece, TF_CONFIG
json-seri hale getirin ve onu ortam değişkenlerine ekleyin:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Şimdi, TF_CONFIG
main.py
bir çalışan işlemi başlatabilirsiniz:
# first kill any previous runs
%killbgscripts
All background processes were killed.-yer tutucu25 l10n-yer
python main.py &> job_0.log
Yukarıdaki komutla ilgili dikkat edilmesi gereken birkaç nokta vardır:
- Bazı bash komutlarını çalıştırmak için bir dizüstü bilgisayar "sihri" olan
%%bash
kullanır. - Bu çalışan sonlandırılmayacağından,
bash
işlemini arka planda çalıştırmak için--bg
bayrağını kullanır. Başlamadan önce tüm çalışanları bekler.
Arka planda çalışan işlem, çıktıyı bu not defterine yazdırmaz, bu nedenle &>
çıktısını bir dosyaya yönlendirir, böylece daha sonra bir günlük dosyasında ne olduğunu inceleyebilirsiniz.
Bu nedenle, işlemin başlaması için birkaç saniye bekleyin:
import time
time.sleep(10)
Şimdi, işçinin günlük dosyasına şu ana kadar ne çıktı olduğunu inceleyin:
cat job_0.log
tutucu28 l10n-yer2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Günlük dosyasının son satırı şunları söylemelidir Started server with target: grpc://localhost:12345
. İlk işçi şimdi hazırdır ve diğer tüm işçi(ler)in ilerlemeye hazır olmasını beklemektedir.
Bu nedenle, ikinci çalışanın alması için tf_config
güncelleyin:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
İkinci işçiyi çalıştırın. Bu, tüm çalışanlar aktif olduğu için eğitimi başlatacaktır (bu nedenle bu süreci arka plana atmaya gerek yoktur):
python main.py
tutucu31 l10n-yerEpoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
İlk çalışan tarafından yazılan günlükleri yeniden kontrol ederseniz, o modelin eğitimine katıldığını öğreneceksiniz:
cat job_0.log
tutucu33 l10n-yer2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
Şaşırtıcı olmayan bir şekilde, bu, bu öğreticinin başındaki test çalışmasından daha yavaş çalıştı.
Birden fazla işçiyi tek bir makinede çalıştırmak yalnızca ek yük ekler.
Buradaki amaç, eğitim süresini iyileştirmek değil, sadece çok işçili eğitime bir örnek vermekti.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
tutucu35 l10n-yerAll background processes were killed.
Çok çalışanlı derinlemesine eğitim
Şimdiye kadar, temel bir çok çalışanlı kurulumun nasıl gerçekleştirileceğini öğrendiniz.
Eğitimin geri kalanında, gerçek kullanım durumları için yararlı veya önemli olabilecek diğer faktörleri ayrıntılı olarak öğreneceksiniz.
Veri kümesi parçalama
Çok çalışanlı eğitimde, yakınsama ve performansı sağlamak için veri kümesi parçalamaya ihtiyaç vardır.
Önceki bölümdeki örnek, tf.distribute.Strategy
API tarafından sağlanan varsayılan otomatik parçalamaya dayanmaktadır. tf.data.experimental.DistributeOptions
öğesinin tf.data.experimental.AutoShardPolicy
öğesini ayarlayarak parçalamayı kontrol edebilirsiniz.
Otomatik parçalama hakkında daha fazla bilgi edinmek için Dağıtılmış giriş kılavuzuna bakın.
Her bir kopyanın her örneği işlemesi için otomatik parçalamanın nasıl kapatılacağına dair hızlı bir örnek ( önerilmez ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
Değerlendirme
validation_data
Model.fit
, her dönem için eğitim ve değerlendirme arasında geçiş yapar. validation_data
alan değerlendirme, aynı çalışan grubuna dağıtılır ve değerlendirme sonuçları toplanır ve tüm çalışanlar için kullanılabilir.
Eğitime benzer şekilde, doğrulama veri kümesi dosya düzeyinde otomatik olarak paylaşılır. Doğrulama veri kümesinde genel bir toplu iş boyutu ayarlamanız ve validation_steps
değerini ayarlamanız gerekir.
Değerlendirme için tekrarlanan bir veri kümesi de önerilir.
Alternatif olarak, düzenli aralıklarla kontrol noktalarını okuyan ve değerlendirmeyi çalıştıran başka bir görev de oluşturabilirsiniz. Tahmincinin yaptığı budur. Ancak bu, değerlendirme yapmak için önerilen bir yol değildir ve bu nedenle ayrıntıları atlanmıştır.
Verim
Artık MultiWorkerMirroredStrategy ile birden çok çalışanda çalışacak şekilde ayarlanmış bir MultiWorkerMirroredStrategy
var.
Çok işçili eğitimin performansını ayarlamak için aşağıdakileri deneyebilirsiniz:
tf.distribute.MultiWorkerMirroredStrategy
birden çok toplu iletişim uygulaması sağlar:-
RING
, ana bilgisayarlar arası iletişim katmanı olarak gRPC kullanan halka tabanlı kolektifleri uygular. -
NCCL
, kolektifleri uygulamak için NVIDIA Toplu İletişim Kitaplığını kullanır. -
AUTO
, seçimi çalışma zamanına erteler.
Toplu uygulamanın en iyi seçimi, GPU'ların sayısına, GPU'ların türüne ve kümedeki ağ ara bağlantısına bağlıdır. Otomatik seçimi geçersiz kılmak için
MultiWorkerMirroredStrategy
communication_options
parametresini belirtin. Örneğin:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
Mümkünse değişkenleri
tf.float
:- Resmi ResNet modeli, bunun nasıl yapılabileceğinin bir örneğini içerir.
Hata toleransı
Eşzamanlı eğitimde, çalışanlardan biri başarısız olursa küme başarısız olur ve herhangi bir arıza giderme mekanizması mevcut değildir.
tf.distribute.Strategy
ile kullanmak, çalışanların öldüğü veya başka bir şekilde istikrarsız olduğu durumlarda hata toleransı avantajıyla birlikte gelir. Bunu, seçtiğiniz dağıtılmış dosya sisteminde eğitim durumunu koruyarak yapabilirsiniz, böylece daha önce başarısız olan veya önceden alınan örneğin yeniden başlatılmasıyla eğitim durumu kurtarılır.
Bir işçi müsait olmadığında, diğer işçiler başarısız olur (muhtemelen bir zaman aşımından sonra). Bu gibi durumlarda, uygun olmayan çalışanın ve başarısız olan diğer çalışanların yeniden başlatılması gerekir.
ModelKontrol noktası geri arama
ModelCheckpoint
geri araması artık hata toleransı işlevi sağlamamaktadır, lütfen bunun yerine BackupAndRestore
geri aramasını kullanın.
ModelCheckpoint
geri araması, kontrol noktalarını kaydetmek için hala kullanılabilir. Ancak bununla eğitim yarıda kaldıysa veya başarıyla tamamlandıysa, eğitime kontrol noktasından devam etmek için modeli manuel olarak yüklemek kullanıcıya aittir.
İsteğe bağlı olarak kullanıcı, ModelCheckpoint
geri çağırma dışında modeli/ağırlıkları kaydetmeyi ve geri yüklemeyi seçebilir.
Model kaydetme ve yükleme
model.save
veya tf.saved_model.save
kullanarak modelinizi kaydetmek için, kaydetme hedefinin her çalışan için farklı olması gerekir.
- Şef olmayan çalışanlar için modeli geçici bir dizine kaydetmeniz gerekecektir.
- Şef için sağlanan model dizinine kaydetmeniz gerekecek.
Birden çok çalışanın aynı konuma yazmaya çalışmasından kaynaklanan hataları önlemek için, çalışan üzerindeki geçici dizinlerin benzersiz olması gerekir.
Tüm dizinlerde kaydedilen model aynıdır ve tipik olarak, geri yükleme veya hizmet için yalnızca şef tarafından kaydedilen modele başvurulmalıdır.
Eğitiminiz tamamlandıktan sonra işçiler tarafından oluşturulan geçici dizinleri silen bir temizleme mantığına sahip olmalısınız.
Şef ve işçilerden aynı anda tasarruf etmenin nedeni, hem şefin hem de çalışanların allreduce iletişim protokolüne katılmasını gerektiren kontrol noktası sırasında değişkenleri topluyor olmanızdır. Öte yandan, şef ve işçilerin aynı model dizinine kaydetmesine izin vermek, çekişme nedeniyle hatalara neden olacaktır.
MultiWorkerMirroredStrategy
kullanarak, program her çalışan üzerinde çalıştırılır ve mevcut çalışanın şef olup olmadığını bilmek için, task_type
ve task_id
niteliklerine sahip küme çözümleyici nesnesinden yararlanır:
-
task_type
size mevcut işin ne olduğunu söyler (örneğin'worker'
). -
task_id
size çalışanın tanımlayıcısını söyler. -
task_id == 0
olan çalışan, baş çalışan olarak atanır.
Aşağıdaki kod parçacığında, write_filepath
işlevi, çalışanın task_id
bağlı olarak yazılacak dosya yolunu sağlar:
- Baş çalışan için (
task_id == 0
ile), orijinal dosya yoluna yazar. - Diğer çalışanlar için, dizin yolundaki
task_id
ile yazılacak geçici bir dizin (temp_dir
) oluşturur:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
Bununla, artık kaydetmeye hazırsınız:
multi_worker_model.save(write_model_path)
tutucu40 l10n-yer2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
Yukarıda açıklandığı gibi, daha sonra model sadece şefin kaydedildiği yoldan yüklenmelidir, bu yüzden şef olmayan işçilerin kaydettiği geçici olanları kaldıralım:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
Şimdi yükleme zamanı geldiğinde, uygun tf.keras.models.load_model
API'sini kullanalım ve daha fazla çalışmaya devam edelim.
Burada, eğitimi yüklemek ve devam ettirmek için yalnızca tek bir çalışanın kullanıldığını varsayın; bu durumda başka bir strategy.scope()
içinde tf.keras.models.load_model
öğesini çağırmazsınız (daha önce tanımlandığı gibi strategy = tf.distribute.MultiWorkerMirroredStrategy()
olduğunu unutmayın) ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
tutucu43 l10n-yerEpoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
Kontrol noktası kaydetme ve geri yükleme
Öte yandan, kontrol noktası, modelinizin ağırlıklarını kaydetmenize ve tüm modeli kaydetmenize gerek kalmadan onları geri yüklemenize olanak tanır.
Burada, modeli izleyen ve tf.train.CheckpointManager
tf.train.Checkpoint
böylece yalnızca en son kontrol noktası korunur:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
CheckpointManager
kurulduktan sonra, şef olmayan çalışanların kaydettiği kontrol noktalarını kaydetmeye ve kaldırmaya hazırsınız:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
Şimdi, modeli geri yüklemeniz gerektiğinde, kullanışlı tf.train.latest_checkpoint
işlevini kullanarak kaydedilen en son kontrol noktasını bulabilirsiniz. Kontrol noktasını geri yükledikten sonra eğitime devam edebilirsiniz.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
tutucu47 l10n-yer2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
BackupAndRestore geri arama
tf.keras.callbacks.BackupAndRestore
geri araması, modeli ve geçerli dönem numarasını backup_dir bağımsız değişkeni altında backup_dir
geçici bir kontrol noktası dosyasına yedekleyerek hataya dayanıklılık işlevselliği BackupAndRestore
. Bu, her dönemin sonunda yapılır.
İşler kesintiye uğrayıp yeniden başladığında, geri arama son kontrol noktasını geri yükler ve eğitim kesintiye uğrayan dönemin başından itibaren devam eder. Kesintiden önce bitmemiş çağda halihazırda yapılmış olan herhangi bir kısmi eğitim, nihai model durumunu etkilememesi için atılacaktır.
Kullanmak için Model.fit
çağrısında bir tf.keras.callbacks.BackupAndRestore
örneği sağlayın.
MultiWorkerMirroredStrategy
ile, bir çalışan kesintiye uğrarsa, kesintiye uğrayan çalışan yeniden başlatılana kadar tüm küme duraklar. Diğer çalışanlar da yeniden başlatılır ve kesintiye uğrayan çalışan kümeye yeniden katılır. Ardından, her çalışan önceden kaydedilmiş olan kontrol noktası dosyasını okur ve önceki durumunu alır, böylece kümenin tekrar eşitlenmesine izin verir. Ardından eğitim devam ediyor.
BackupAndRestore
geri çağrısı, eğitim durumunu kaydetmek ve geri yüklemek için CheckpointManager
kullanır; bu, mevcut kontrol noktalarını en sonuncusuyla birlikte izleyen kontrol noktası adlı bir dosya oluşturur. Bu nedenle, ad çakışmasını önlemek için backup_dir
, diğer kontrol noktalarını depolamak için yeniden kullanılmamalıdır.
Şu anda BackupAndRestore
geri çağrısı, stratejisi olmayan tek çalışanlı eğitimi— MirroredStrategy
— ve MultiWorkerMirroredStrategy
ile çok çalışanlı eğitimi destekler.
Aşağıda hem çok işçili eğitim hem de tek işçili eğitim için iki örnek verilmiştir:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
tutucu49 l10n-yer2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
BackupAndRestore içinde belirttiğiniz backup_dir
dizinini BackupAndRestore
, geçici olarak oluşturulmuş bazı kontrol noktası dosyaları fark edebilirsiniz. Bu dosyalar daha önce kaybolan örnekleri kurtarmak için gereklidir ve eğitiminizden başarılı bir şekilde çıkıldığında Model.fit
sonunda kitaplık tarafından kaldırılacaktır.
Ek kaynaklar
- TensorFlow kılavuzundaki Dağıtılmış eğitim , mevcut dağıtım stratejilerine genel bir bakış sağlar.
- Keras ve MultiWorkerMirrorredStrategy ile Özel eğitim döngüsü ,
MultiWorkerMirroredStrategy
ile nasıl kullanılacağını ve özel bir eğitim döngüsünü gösterir. - Birçoğu birden fazla dağıtım stratejisi yürütmek üzere yapılandırılabilen resmi modellere göz atın.
- tf.fonksiyon kılavuzu ile daha iyi performans, TensorFlow modellerinizin performansını optimize etmek için kullanabileceğiniz TensorFlow Profiler gibi diğer stratejiler ve araçlar hakkında bilgi sağlar.