TensorFlow.org'da görüntüleyin | Google Colab'da çalıştırın | Kaynağı GitHub'da görüntüleyin | Not defterini indir |
genel bakış
Parametre sunucusu eğitimi , birden çok makinede model eğitimini büyütmek için yaygın bir veri paralel yöntemidir.
Bir parametre sunucusu eğitim kümesi, çalışanlardan ve parametre sunucularından oluşur. Değişkenler parametre sunucularında oluşturulur ve her adımda çalışanlar tarafından okunur ve güncellenir. Varsayılan olarak, çalışanlar bu değişkenleri birbirleriyle senkronize etmeden bağımsız olarak okur ve günceller. Bu nedenle bazen parametre sunucusu tarzı eğitime eşzamansız eğitim adı verilir.
TensorFlow 2'de parametre sunucusu eğitimi, eğitim adımlarını (parametre sunucuları eşliğinde) binlerce çalışana kadar ölçeklenen bir kümeye dağıtan tf.distribute.experimental.ParameterServerStrategy
sınıfı tarafından desteklenir.
Desteklenen eğitim yöntemleri
Desteklenen iki ana eğitim yöntemi vardır:
- Eğitimin üst düzey bir soyutlamasını ve işlenmesini tercih ettiğinizde önerilen
Model.fit
API. - Özel eğitim döngüsü (daha fazla ayrıntı için Özel eğitim , Sıfırdan eğitim döngüsü yazma ve Keras ve MultiWorkerMirrorredStrategy ile Özel eğitim döngüsüne başvurabilirsiniz .) Eğitim döngüsünün ayrıntılarını tanımlamayı tercih ettiğinizde özel döngü eğitimi önerilir.
İşler ve görevler içeren bir küme
Tercih edilen API'den ( Model.fit
veya özel eğitim döngüsü) bağımsız olarak, TensorFlow 2'deki dağıtılmış eğitim şunları içerir: birkaç 'jobs'
içeren bir 'cluster'
ve işlerin her birinin bir veya daha fazla 'tasks'
olabilir.
Parametre sunucusu eğitimini kullanırken şunlara sahip olunması önerilir:
- Bir koordinatör işi (iş adı
chief
olan) - Birden çok işçi işi (iş adı
worker
); ve - Birden çok parametre sunucusu işi (iş adı
ps
)
Koordinatör kaynakları oluştururken, eğitim görevlerini gönderirken, kontrol noktaları yazarken ve görev başarısızlıklarıyla ilgilenirken, çalışanlar ve parametre sunucuları koordinatörden gelen istekleri dinleyen tf.distribute.Server
çalıştırır.
Model.fit
API ile parametre sunucusu eğitimi
Model.fit
API ile parametre sunucusu eğitimi, koordinatörün girdi olarak bir tf.distribute.experimental.ParameterServerStrategy
nesnesi ve bir tf.keras.utils.experimental.DatasetCreator
kullanmasını gerektirir. Stratejisiz veya diğer stratejilerle Model.fit
kullanımına benzer şekilde, iş akışı modelin oluşturulmasını ve derlenmesini, geri aramaların hazırlanmasını ve ardından bir Model.fit
aramasını içerir.
Özel bir eğitim döngüsü ile parametre sunucusu eğitimi
Özel eğitim döngüleriyle, tf.distribute.experimental.coordinator.ClusterCoordinator
sınıfı, koordinatör için kullanılan temel bileşendir.
-
ClusterCoordinator
sınıfının birtf.distribute.Strategy
nesnesiyle birlikte çalışması gerekir. - Bu
tf.distribute.Strategy
nesnesi, kümenin bilgilerini sağlamak için gereklidir ve tf.distribute.Strategy ile Özel eğitimde gösterildiği gibi bir eğitim adımı tanımlamak için kullanılır. -
ClusterCoordinator
nesnesi daha sonra bu eğitim adımlarının yürütülmesini uzaktaki çalışanlara gönderir. - Parametre sunucusu eğitimi için
ClusterCoordinator
birtf.distribute.experimental.ParameterServerStrategy
ile çalışması gerekir.
ClusterCoordinator
nesnesi tarafından sağlanan en önemli API, schedule
:
-
schedule
API'si birtf.function
ve hemen geleceğe benzer birRemoteValue
döndürür. - Kuyruğa alınan işlevler, arka plan iş parçacıklarında uzaktaki çalışanlara gönderilecek ve
RemoteValue
eşzamansız olarak doldurulacak. -
schedule
, çalışan ataması gerektirmediğinden, aktarılantf.function
. işlevi, mevcut herhangi bir çalışan üzerinde yürütülebilir. - Üzerinde yürütüldüğü çalışan tamamlanmadan önce kullanılamaz hale gelirse, işlev uygun başka bir çalışan üzerinde yeniden denenir.
- Bu gerçek ve işlevin yürütülmesinin atomik olmadığı gerçeği nedeniyle, bir işlev birden fazla kez yürütülebilir.
ClusterCoordinator
, uzak işlevleri göndermeye ek olarak, tüm çalışanlar üzerinde veri kümeleri oluşturmaya ve bir çalışan hatadan kurtulduğunda bu veri kümelerini yeniden oluşturmaya da yardımcı olur.
Eğitim kurulumu
Eğitim, Model.fit
ve özel eğitim döngüsü yollarına ayrılacak ve ihtiyaçlarınıza uygun olanı seçebilirsiniz. "X ile Eğitim" dışındaki bölümler her iki yol için de geçerlidir.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
küme kurulumu
Yukarıda bahsedildiği gibi, bir parametre sunucusu eğitim kümesi, eğitim programınızı çalıştıran bir koordinatör görevi, TensorFlow sunucularını çalıştıran bir veya birkaç işçi ve parametre sunucusu görevi ( tf.distribute.Server
ve muhtemelen sepet değerlendirmesini çalıştıran ek bir değerlendirme görevi gerektirir. (aşağıdaki sepet değerlendirme bölümüne bakın). Bunları ayarlamak için gereksinimler şunlardır:
- Koordinatör görevinin, değerlendirici dışındaki tüm diğer TensorFlow sunucularının adreslerini ve bağlantı noktalarını bilmesi gerekir.
- Çalışanların ve parametre sunucularının hangi bağlantı noktasını dinlemeleri gerektiğini bilmeleri gerekir. Basitlik adına, bu görevlerde TensorFlow sunucuları oluştururken genellikle tam küme bilgilerini iletebilirsiniz.
- Değerlendirici görevinin eğitim kümesinin kurulumunu bilmesi gerekmez. Varsa, eğitim kümesine bağlanmaya çalışmamalıdır.
- İşçiler ve parametre sunucuları sırasıyla
"worker"
ve"ps"
görev türlerine sahip olmalıdır. Koordinatör, eski nedenlerle görev türü olarak"chief"
kullanmalıdır.
Bu öğreticide, tüm parametre sunucusu eğitiminin Colab'da çalıştırılabilmesi için bir süreç içi küme oluşturacaksınız. Daha sonraki bir bölümde gerçek kümeleri nasıl kuracağınızı öğreneceksiniz.
İşlem içi küme
Önceden birkaç TensorFlow sunucusu oluşturarak başlayacak ve bunlara daha sonra bağlanacaksınız. Bunun yalnızca bu öğreticinin gösterimi amacıyla yapıldığını ve gerçek eğitimde sunucuların "worker"
ve "ps"
makinelerinde başlatılacağını unutmayın.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
İşlem içi küme kurulumu, burada olduğu gibi birim testlerinde sıklıkla kullanılır.
Yerel test için başka bir seçenek de süreçleri yerel makinede başlatmaktır; bu yaklaşımın bir örneği için Keras ile çok çalışanlı eğitime bakın.
Bir ParameterServerStrategy örneğini oluşturun
Eğitim koduna dalmadan önce, bir ParameterServerStrategy
nesnesinin örneğini oluşturalım. Model.fit
ile mi yoksa özel bir eğitim döngüsüyle mi ilerlediğinize bakılmaksızın bunun gerekli olduğunu unutmayın. variable_partitioner
bağımsız değişkeni, Değişken parçalama bölümünde açıklanacaktır.
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
tutucu4 l10n-yerINFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
GPU'ları eğitim için kullanmak için, her çalışanın görebileceği GPU'ları tahsis edin. ParameterServerStrategy
, tüm çalışanların aynı sayıda GPU'ya sahip olması gerektiği kısıtlamasıyla, her çalışanda mevcut tüm GPU'ları kullanır.
Değişken parçalama
Değişken parçalama, bir değişkeni parça adı verilen daha küçük birden çok değişkene bölmeyi ifade eder. Değişken parçalama, bu parçalara erişirken ağ yükünü dağıtmak için faydalı olabilir. Normal bir değişkenin hesaplanmasını ve depolanmasını birden çok parametre sunucusu arasında dağıtmak da yararlıdır.
Değişken parçalamayı etkinleştirmek için, bir ParameterServerStrategy
nesnesi oluştururken bir variable_partitioner
iletebilirsiniz. Değişken_bölümü, bir variable_partitioner
her oluşturulduğunda çağrılır ve değişkenin her boyutu boyunca parça sayısını döndürmesi beklenir. tf.distribute.experimental.partitioners.MinSizePartitioner
gibi bazı kullanıma hazır variable_partitioner
sağlanır. Model eğitim hızı üzerinde olumsuz etkisi olabilecek küçük değişkenleri bölümlemekten kaçınmak için tf.distribute.experimental.partitioners.MinSizePartitioner
gibi boyut tabanlı bölümleyicilerin kullanılması önerilir.
Bir variable_partitioner
geçirildiğinde ve doğrudan strategy.scope()
altında bir değişken oluşturursanız, bu, parça listesine erişim sağlayan variables
özelliğine sahip bir kapsayıcı türü haline gelecektir. Çoğu durumda, bu kapsayıcı, tüm parçaları birleştirerek otomatik olarak bir Tensöre dönüştürülür. Sonuç olarak, normal bir değişken olarak kullanılabilir. Öte yandan, tf.nn.embedding_lookup gibi bazı tf.nn.embedding_lookup
yöntemleri bu kapsayıcı türü için verimli uygulama sağlar ve bu yöntemlerde otomatik birleştirme önlenir.
Daha fazla ayrıntı için lütfen tf.distribute.experimental.ParameterServerStrategy
API belgelerine bakın.
Model.fit
ile eğitim
Keras, kaputun altındaki eğitim döngüsünü, geçersiz train_step
esnekliği ve TensorBoard için kontrol noktası kaydetme veya özet kaydetme gibi işlevler sağlayan geri aramalarla yöneten Model.fit
aracılığıyla kullanımı kolay bir eğitim API'si sağlar. Model.fit
ile aynı eğitim kodu, strateji nesnesinin basit bir takasıyla diğer stratejiler için kullanılabilir.
Giriş verileri
Parametre sunucusu eğitimi ile Model.fit
, giriş verilerinin tf.distribute.InputContext
türünde tek bir argüman alan ve bir tf.data.Dataset
döndüren bir çağrılabilir içinde sağlanmasını gerektirir. Ardından, bu tür callable
alan bir tf.keras.utils.experimental.DatasetCreator
nesnesi ve input_options
bağımsız değişkeni aracılığıyla isteğe bağlı bir tf.distribute.InputOptions
nesnesi oluşturun.
Verileri parametre sunucusu eğitimi ile karıştırmanın ve tekrarlamanın ve kitaplığın çağ sınırlarını bilmesi için fit
steps_per_epoch
belirtmenin tavsiye edildiğini unutmayın.
InputContext
bağımsız değişkeni hakkında daha fazla bilgi için lütfen Dağıtılmış girdi öğreticisine bakın.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
dataset_fn
içindeki kod, çalışan makinelerin her birinde genellikle CPU olan giriş aygıtında çağrılır.
Model oluşturma ve derleme
Şimdi, bir tf.keras.Model
bir tf.keras.models.Gösterim amaçlı tf.keras.models.Sequential
model— ve ardından, optimize edici, ölçümler veya steps_per_execution
gibi parametreleri dahil etmek için bir Model.compile
çağrısı oluşturacaksınız:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Geri aramalar ve eğitim
model.fit
asıl eğitim için çağırmadan önce, aşağıdakiler gibi genel görevler için gerekli geri aramaları hazırlayalım:
-
ModelCheckpoint
: model ağırlıklarını kaydetmek için. -
BackupAndRestore
: eğitim ilerlemesinin otomatik olarak yedeklendiğinden ve kümenin kullanılamaması durumunda (iptal etme veya önceden alma gibi) kurtarıldığından emin olmak için; veya -
TensorBoard
: ilerleme raporlarını TensorBoard aracında görselleştirilen özet dosyalara kaydetmek için.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
tutucu8 l10n-yerEpoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
ClusterCoordinator
ile doğrudan kullanım (isteğe bağlı)
Model.fit
eğitim yolunu seçseniz bile, işçiler üzerinde yürütülmesini istediğiniz diğer işlevleri planlamak için isteğe bağlı olarak bir tf.distribute.experimental.coordinator.ClusterCoordinator
nesnesinin örneğini oluşturabilirsiniz. Daha fazla ayrıntı ve örnek için Özel eğitim döngüsüyle eğitim bölümüne bakın.
Özel bir eğitim döngüsüyle eğitim
tf.distribute.Strategy
ile özel eğitim döngülerini kullanmak, eğitim döngülerini tanımlamak için büyük esneklik sağlar. Yukarıda tanımlanan ParameterServerStrategy
ile ( strategy
olarak), eğitim adımlarının yürütülmesini uzaktan çalışanlara göndermek için bir tf.distribute.experimental.coordinator.ClusterCoordinator
kullanacaksınız.
Ardından, diğer tf.distribute.Strategy
s ile eğitim döngüsünde yaptığınız gibi bir model oluşturacak, bir veri kümesi ve bir adım işlevi tanımlayacaksınız. Daha fazla ayrıntıyı tf.distribute.Strategy öğreticisiyle Özel eğitimde bulabilirsiniz.
Verimli veri kümesi önceden getirmeyi sağlamak için aşağıdaki Eğitim adımlarını uzaktan çalışanlara gönderme bölümünde belirtilen önerilen dağıtılmış veri kümesi oluşturma API'lerini kullanın. Ayrıca, çalışanlara tahsis edilen GPU'lardan tam olarak yararlanmak için worker_fn
içinden Strategy.run
çağırdığınızdan emin olun. Adımların geri kalanı, GPU'lu veya GPU'suz eğitim için aynıdır.
Bu bileşenleri aşağıdaki adımlarda oluşturalım:
verileri ayarla
İlk olarak, Keras ön işleme katmanları tarafından uygulanan ön işleme mantığını içeren bir veri kümesi oluşturan bir işlev yazın.
Bu katmanları dataset_fn
dışında yaratacaksınız ama dönüşümü dataset_fn
içinde uygulayacaksınız, çünkü dataset_fn içinde değişkenlerin oluşturulmasına izin vermeyen bir dataset_fn
içine tf.function
.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
tutucu10 l10n-yer/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Bir veri kümesinde oyuncak örnekleri oluşturun:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Ardından, dataset_fn
içine sarılmış eğitim veri kümesini oluşturun:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Modeli oluşturun
Ardından, modeli ve diğer nesneleri oluşturun. Tüm değişkenleri strategy.scope
altında oluşturduğunuzdan emin olun.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
FixedShardsPartitioner
kullanımının tüm değişkenleri iki parçaya böldüğünü ve her parçanın farklı parametre sunucularına atandığını doğrulayalım:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Eğitim adımını tanımlayın
Üçüncüsü, bir tf.function
içine sarılmış eğitim adımını oluşturun:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
Yukarıdaki eğitim adımı işlevinde, step_fn
Strategy.run
ve Strategy.reduce
çağrıları, çalışan başına birden çok GPU'yu destekleyebilir. Çalışanlara ayrılmış GPU'lar varsa, Strategy.run
veri kümelerini birden çok kopyaya dağıtır.
Eğitim adımlarını uzaktan çalışanlara gönderin
Tüm hesaplamalar ParameterServerStrategy
tarafından tanımlandıktan sonra, kaynaklar oluşturmak ve eğitim adımlarını uzak çalışanlara dağıtmak için tf.distribute.experimental.coordinator.ClusterCoordinator
sınıfını kullanacaksınız.
Önce bir ClusterCoordinator
nesnesi oluşturalım ve strateji nesnesini iletelim:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Ardından, çalışan başına bir veri kümesi ve bir yineleyici oluşturun. Aşağıdaki per_worker_dataset_fn
, GPU'lara sorunsuz bir şekilde verimli bir şekilde önceden getirmeyi sağlamak için dataset_fn
strategy.distribute_datasets_from_function
içine sarmanız önerilir.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
tutucu18 l10n-yerWARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Son adım, hesaplamayı ClusterCoordinator.schedule
kullanarak uzak çalışanlara dağıtmaktır:
-
schedule
yöntemi birtf.function
ve hemen geleceğe benzer birRemoteValue
döndürür. Kuyruğa alınan işlevler, arka plan iş parçacıklarında uzaktaki çalışanlara gönderilir veRemoteValue
eşzamansız olarak doldurulur. -
ClusterCoordinator.join
join
, tüm zamanlanmış işlevler yürütülene kadar beklemek için kullanılabilir.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
tutucu20 l10n-yerINFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
RemoteValue
sonucunu şu şekilde alabilirsiniz:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
tutucu22 l10n-yerFinal loss is 0.000000
Alternatif olarak, tüm adımları başlatabilir ve tamamlanmasını beklerken bir şeyler yapabilirsiniz:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Bu özel örneğe yönelik eksiksiz eğitim ve sunum iş akışı için lütfen bu testi inceleyin .
Veri kümesi oluşturma hakkında daha fazla bilgi
Yukarıdaki koddaki veri kümesi, ClusterCoordinator.create_per_worker_dataset
API'si kullanılarak oluşturulur). Çalışan başına bir veri kümesi oluşturur ve bir kapsayıcı nesnesi döndürür. Çalışan başına bir yineleyici oluşturmak için iter
yöntemini çağırabilirsiniz. Çalışan başına yineleyici, çalışan başına bir yineleyici içerir ve bir çalışanın karşılık gelen dilimi, işlev belirli bir çalışan üzerinde yürütülmeden önce ClusterCoordinator.schedule
yöntemine geçirilen işlevin giriş bağımsız değişkeninde değiştirilir.
Şu anda, ClusterCoordinator.schedule
yöntemi, çalışanların eşdeğer olduğunu varsayar ve bu nedenle, bir Dataset.shuffle
işlemi içermeleri durumunda farklı şekilde karıştırılabilmeleri dışında, farklı çalışanlar üzerindeki veri kümelerinin aynı olduğunu varsayar. Bu nedenle, veri kümelerinin süresiz olarak tekrarlanması ve bir veri kümesinden OutOfRangeError
güvenmek yerine sınırlı sayıda adım planlamanız da önerilir.
Diğer bir önemli not da, tf.data
veri kümelerinin, görev sınırları arasında örtük serileştirmeyi ve seriyi kaldırmayı desteklememesidir. Bu nedenle, tüm veri kümesini ClusterCoordinator.create_per_worker_dataset
öğesine iletilen işlevin içinde oluşturmak önemlidir.
Değerlendirme
Dağıtılmış eğitimde bir değerlendirme döngüsü tanımlamanın ve çalıştırmanın birden fazla yolu vardır. Her birinin aşağıda açıklandığı gibi kendi artıları ve eksileri vardır. Bir tercihiniz yoksa satır içi değerlendirme yöntemi önerilir.
satır içi değerlendirme
Bu yöntemde koordinatör, eğitim ve değerlendirme arasında geçiş yapar ve bu nedenle buna satır içi değerlendirme denir.
Satır içi değerlendirmenin çeşitli faydaları vardır. Örneğin:
- Tek bir görevin tutamayacağı büyük değerlendirme modellerini ve değerlendirme veri kümelerini destekleyebilir.
- Değerlendirme sonuçları, bir sonraki çağın eğitimi için kararlar almak için kullanılabilir.
Satır içi değerlendirmeyi uygulamanın iki yolu vardır: doğrudan değerlendirme ve dağıtılmış değerlendirme.
- Doğrudan değerlendirme : Küçük modeller ve değerlendirme veri kümeleri için, koordinatör, değerlendirme veri kümesini koordinatörde kullanarak doğrudan dağıtılmış model üzerinde değerlendirme çalıştırabilir:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
tutucu25 l10n-yerWARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Dağıtılmış değerlendirme : Doğrudan koordinatör üzerinde çalıştırılması mümkün olmayan büyük modeller veya veri kümeleri için, koordinatör görevi, değerlendirme görevlerini
ClusterCoordinator.schedule
/ClusterCoordinator.join
yöntemleri aracılığıyla çalışanlara dağıtabilir:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
tutucu27 l10n-yerWARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Yan araç değerlendirmesi
Diğer bir yöntem, kontrol noktalarını tekrar tekrar okuyan ve en son kontrol noktasında değerlendirmeyi çalıştıran özel bir değerlendirici görevi oluşturduğunuz sepet değerlendirmesi olarak adlandırılır. Egzersiz döngünüzü değerlendirme sonuçlarına göre değiştirmeniz gerekmiyorsa, egzersiz programınızın erken bitmesine olanak tanır. Ancak, değerlendirmeyi tetiklemek için ek bir değerlendirici görevi ve periyodik kontrol noktası gerektirir. Olası bir sepet değerlendirme döngüsü aşağıdadır:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Gerçek dünyadaki kümeler
Gerçek bir üretim ortamında, tüm görevleri farklı makinelerde farklı süreçlerde çalıştıracaksınız. Her görevde küme bilgilerini yapılandırmanın en basit yolu, "TF_CONFIG"
ortam değişkenlerini ayarlamak ve "TF_CONFIG"
ayrıştırmak için bir tf.distribute.cluster_resolver.TFConfigClusterResolver
kullanmaktır.
"TF_CONFIG"
ortam değişkenleri hakkında genel bir açıklama için, Dağıtılmış eğitim kılavuzuna bakın.
Eğitim görevlerinizi Kubernetes veya diğer yapılandırma şablonlarını kullanarak başlatırsanız, bu şablonların sizin için zaten “TF_CONFIG"
olması çok olasıdır.
"TF_CONFIG"
ortam değişkenini ayarlayın
3 çalışanınız ve 2 parametre sunucunuz olduğunu varsayalım, işçi 1'in "TF_CONFIG"
olabilir:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
Değerlendiricinin "TF_CONFIG"
değeri şunlar olabilir:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
Değerlendirici için yukarıdaki "TF_CONFIG"
dizisindeki "cluster"
kısmı isteğe bağlıdır.
Tüm görevler için aynı ikili dosyayı kullanırsanız
Tüm bu görevleri tek bir ikili dosya kullanarak çalıştırmayı tercih ederseniz, programınızın en başında farklı rollere ayrılmasına izin vermeniz gerekir:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
Aşağıdaki kod bir TensorFlow sunucusunu başlatır ve bekler:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
İşlem hatası
işçi hatası
tf.distribute.experimental.coordinator.ClusterCoordinator
veya Model.fit
, çalışan hatası için yerleşik hata toleransı sağlar. Çalışan kurtarıldıktan sonra, veri kümelerini yeniden oluşturmak için çalışanlar üzerinde önceden sağlanan veri kümesi işlevi (özel eğitim döngüsü için ClusterCoordinator.create_per_worker_dataset
veya tf.keras.utils.experimental.DatasetCreator
for Model.fit
için) çağrılır.
Parametre sunucusu veya koordinatör hatası
Ancak, koordinatör bir parametre sunucusu hatası gördüğünde, hemen bir UnavailableError
veya AbortedError
. Bu durumda koordinatörü yeniden başlatabilirsiniz. Koordinatörün kendisi de kullanılamaz hale gelebilir. Bu nedenle, eğitim ilerlemesini kaybetmemek için belirli araçlar önerilir:
Model.fit
için, ilerleme kaydetme ve geri yüklemeyi otomatik olarak gerçekleştiren birBackupAndRestore
geri araması kullanmalısınız. Örnek için yukarıdaki Geri Aramalar ve eğitim bölümüne bakın.Özel bir eğitim döngüsü için, model değişkenlerini periyodik olarak kontrol etmeli ve eğitim başlamadan önce varsa bir kontrol noktasından model değişkenlerini yüklemelisiniz. Bir optimize edici kontrol noktasına sahipse, eğitim ilerlemesi yaklaşık olarak
optimizer.iterations
öğesinden çıkarılabilir:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
RemoteValue
Alma
Bir işlev başarıyla yürütülürse RemoteValue
başarılı olacağı garanti edilir. Bunun nedeni, şu anda bir işlev yürütüldükten sonra dönüş değerinin koordinatöre hemen kopyalanmasıdır. Kopyalama sırasında herhangi bir çalışan hatası varsa, işlev uygun başka bir çalışan üzerinde yeniden denenecektir. Bu nedenle, performans için optimize etmek istiyorsanız, dönüş değeri olmayan işlevleri planlayabilirsiniz.
Hata raporlama
Koordinatör, parametre sunucularından UnavailableError
gibi bir hata veya InvalidArgument
tf.debugging.check_numerics
gibi diğer uygulama hataları gördüğünde, hatayı yükseltmeden önce bekleyen ve kuyruğa alınan tüm işlevleri iptal eder. Karşılık gelen RemoteValue
, CancelledError
yükseltir.
Bir hata oluştuktan sonra, koordinatör aynı hatayı veya iptal edilen fonksiyonlardan herhangi bir hatayı ortaya çıkarmayacaktır.
Performans iyileştirme
ParameterServerStrategy
ve ClusterResolver
ile antrenman yaparken performans sorunları görmenizin birkaç olası nedeni vardır.
Yaygın bir neden, parametre sunucularının dengesiz yüke sahip olması ve bazı ağır yüklü parametre sunucularının kapasiteye ulaşmış olmasıdır. Ayrıca birden fazla kök neden olabilir. Bu sorunu hafifletmek için bazı basit yöntemler şunlardır:
- Bir
ParameterServerStrategy
oluştururken birvariable_partitioner
belirterek büyük model değişkenlerinizi parçalayın. - Mümkünse, tüm parametre sunucularının tek adımda ihtiyaç duyduğu bir etkin nokta değişkeni oluşturmaktan kaçının. Örneğin, varsayılan davranış, öğrenme hızının belirli bir parametre sunucusuna yerleştirilen ve her adımda diğer tüm parametre sunucuları tarafından istenen bir değişken haline gelmesi olduğundan, optimize edicilerde sabit bir öğrenme oranı veya alt sınıf
tf.keras.optimizers.schedules.LearningRateSchedule
. . - Büyük kelime dağarcığınızı Keras ön işleme katmanlarına geçirmeden önce karıştırın.
Performans sorunlarının bir başka olası nedeni de koordinatördür. İlk schedule
/ join
uygulamanız Python tabanlıdır ve bu nedenle iş parçacığı ek yüküne sahip olabilir. Ayrıca koordinatör ve işçiler arasındaki gecikme büyük olabilir. Eğer durum buysa,
Model.fit
için,steps_per_execution
sağlananstep_per_execution bağımsız değişkeniniModel.compile
büyük bir değere ayarlayabilirsiniz.Özel bir eğitim döngüsü için, birden çok adımı tek bir
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Kitaplık daha da optimize edildiğinden, umarım çoğu kullanıcı gelecekte adımları manuel olarak paketlemek zorunda kalmaz.
Ek olarak, performans iyileştirme için küçük bir numara, yukarıdaki görev hatası işleme bölümünde açıklandığı gibi, işlevleri bir dönüş değeri olmadan programlamaktır.
Bilinen sınırlamalar
Bilinen sınırlamaların çoğu zaten yukarıdaki bölümlerde ele alınmıştır. Bu bölüm bir özet sunar.
ParameterServerStrategy
genel
- Hata toleransının düzgün çalışması için koordinatör dahil her görevde
os.environment["grpc_fail_fast"]="use_caller"
gereklidir. - Senkronize parametre sunucusu eğitimi desteklenmez.
- Optimum performansı elde etmek için genellikle birden fazla adımı tek bir işlevde toplamak gerekir.
- Parçalanmış değişkenler içeren bir
tf.saved_model.load
tf.saved_model.load yoluyla yüklenmesi desteklenmez. Böyle bir save_modelinin TensorFlow Serving kullanılarak yüklenmesinin işe yarayacağını unutmayın. - Parçalanmış optimize edici yuva değişkenlerini içeren bir kontrol noktasının farklı sayıda parçaya yüklenmesi desteklenmez.
- Koordinatör görevini yeniden başlatmadan parametre sunucusu hatasından kurtarma işlemi desteklenmez.
-
tf.lookup.StaticHashTable
(genellikletf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
vetf.keras.layers.TextVectorization
gibi bazı Keras ön işleme katmanları tarafından kullanılır) kullanımı, kaynakların koordinatör şu anda parametre sunucusu eğitimi ile. Bunun, çalışanlardan koordinatöre kadar arama RPC'leri için performans etkileri vardır. Bu, ele alınması gereken mevcut bir yüksek önceliktir.
Model.fit
özellikleri
-
steps_per_epoch
Model.fit
değişkeni gereklidir. Bir çağda uygun aralıkları sağlayan bir değer seçebilirsiniz. -
ParameterServerStrategy
, performans nedenleriyle toplu düzeyde çağrılara sahip özel geri aramaları desteklemez. Bu çağrıları, uygun şekilde seçilmişsteps_per_epoch
ile çağ düzeyinde çağrılara dönüştürmelisiniz, böylece hersteps_per_epoch
adım sayısı olarak adlandırılırlar. Yerleşik geri aramalar etkilenmez: toplu düzeydeki aramaları, performans gösterecek şekilde değiştirildi.ParameterServerStrategy
için toplu düzeyde çağrıların desteklenmesi planlanmaktadır. - Aynı nedenle, diğer stratejilerden farklı olarak, ilerleme çubuğu ve metrikler yalnızca dönem sınırlarında günlüğe kaydedilir.
-
run_eagerly
desteklenmiyor.
Özel eğitim döngüsü özellikleri
-
ClusterCoordinator.schedule
, bir veri kümesi için ziyaret garantilerini desteklemez. -
ClusterCoordinator.create_per_worker_dataset
kullanıldığında, tüm veri kümesi, kendisine iletilen işlevin içinde oluşturulmalıdır. -
tf.data.Options
,ClusterCoordinator.create_per_worker_dataset
tarafından oluşturulan bir veri kümesinde yoksayılır.