Keras ve MultiWorkerMirroredStrategy ile özel eğitim döngüsü

TensorFlow.org'da görüntüleyin Google Colab'da çalıştırın Kaynağı GitHub'da görüntüleyin Not defterini indir

genel bakış

Bu eğitici, MultiWorkerMirroredStrategy aracılığıyla dağıtılan özel eğitim döngüsü API'si ile çok çalışanlı eğitimi gösterir, böylece tek çalışan üzerinde çalışacak şekilde tasarlanmış bir Keras modeli, minimum kod değişikliği ile birden çok çalışan üzerinde sorunsuz bir şekilde çalışabilir.

Modelimizi eğitmek için özel eğitim döngüleri kullanıyoruz çünkü bunlar bize eğitim üzerinde esneklik ve daha fazla kontrol sağlıyor. Ayrıca, modelde ve eğitim döngüsünde hata ayıklamak daha kolaydır. Daha ayrıntılı bilgileri Sıfırdan eğitim döngüsü yazma bölümünde bulabilirsiniz.

MultiWorkerMirroredStrategy'yi model.fit MultiWorkerMirroredStrategy ile nasıl kullanacağınızı arıyorsanız, bunun yerine bu eğiticiye bakın.

TensorFlow'da Dağıtılmış Eğitim kılavuzu, tf.distribute.Strategy API'lerini daha derinlemesine anlamak isteyenler için TensorFlow'un desteklediği dağıtım stratejilerine genel bir bakış için mevcuttur.

Kurmak

İlk olarak, bazı gerekli ithalatlar.

import json
import os
import sys

TensorFlow'u içe aktarmadan önce ortamda birkaç değişiklik yapın.

Tüm GPU'ları devre dışı bırakın. Bu, aynı GPU'yu kullanmaya çalışan çalışanların neden olduğu hataları önler. Gerçek bir uygulama için her işçi farklı bir makinede olacaktır.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

TF_CONFIG ortam değişkenini sıfırlayın, daha sonra bununla ilgili daha fazlasını göreceksiniz.

os.environ.pop('TF_CONFIG', None)

Geçerli dizinin python yolunda olduğundan emin olun. Bu, not defterinin %%writefile tarafından yazılan dosyaları daha sonra içe aktarmasına olanak tanır.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Şimdi TensorFlow'u içe aktarın.

import tensorflow as tf

Veri kümesi ve model tanımı

Ardından, basit bir model ve veri kümesi kurulumuyla bir mnist.py dosyası oluşturun. Bu python dosyası, bu öğreticide çalışan işlemler tarafından kullanılacaktır:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
tutucu6 l10n-yer
Writing mnist.py

Çok Çalışan Yapılandırması

Şimdi çok işçili eğitim dünyasına girelim. TensorFlow'da, her biri muhtemelen farklı bir role sahip olan birden çok makinede eğitim için TF_CONFIG ortam değişkeni gereklidir. Aşağıda kullanılan TF_CONFIG , kümenin parçası olan her çalışanda küme yapılandırmasını belirtmek için kullanılan bir JSON dizesidir. Bu, cluster_resolver.TFConfigClusterResolver kullanılarak bir küme belirtmek için varsayılan yöntemdir, ancak distribute.cluster_resolver modülünde kullanılabilen başka seçenekler de vardır.

Kümenizi tanımlayın

İşte örnek bir yapılandırma:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

İşte bir JSON dizesi olarak serileştirilmiş aynı TF_CONFIG :

json.dumps(tf_config)
tutucu9 l10n-yer
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

TF_CONFIG iki bileşeni vardır: cluster ve task .

  • cluster tüm işçiler için aynıdır ve worker gibi farklı iş türlerinden oluşan bir dikt olan eğitim kümesi hakkında bilgi sağlar. MultiWorkerMirroredStrategy ile çok çalışanlı eğitimde, normal bir worker yaptığına ek olarak, genellikle kontrol noktasını kaydetme ve TensorBoard için özet dosyası yazma gibi biraz daha fazla sorumluluk alan bir worker vardır. Böyle bir işçiye chief işçi denir ve 0 index worker baş worker olarak atanması adettendir (aslında tf.distribute.Strategy bu şekilde uygulanır).

  • task , mevcut görev hakkında bilgi sağlar ve her çalışan için farklıdır. Bu çalışanın type ve index belirtir.

Bu örnekte, görev type "worker" ve görev index 0 olarak ayarladınız. Bu makine ilk işçidir ve baş işçi olarak atanacak ve diğerlerinden daha fazla iş yapacaktır. Diğer makinelerin de TF_CONFIG ortam değişkeninin ayarlanması gerekeceğini ve bu makinelerin rollerinin ne olduğuna bağlı olarak aynı cluster diktesine, ancak farklı görev type veya görev index sahip olması gerektiğini unutmayın.

Örnekleme amacıyla, bu öğretici, localhost üzerinde 2 işçi ile bir TF_CONFIG nasıl ayarlanabileceğini gösterir. Uygulamada, kullanıcılar harici IP adreslerinde/portlarında birden çok işçi oluşturacak ve her bir çalışana uygun şekilde TF_CONFIG ayarlayacaktır.

Bu örnekte 2 işçi kullanacaksınız, birinci işçinin TF_CONFIG yukarıda gösterilmiştir. İkinci çalışan için tf_config['task']['index']=1 ayarlarsınız

Yukarıda, tf_config sadece yerel bir değişkendir. Eğitimi yapılandırmak için gerçekten kullanmak için, bu sözlüğün JSON olarak serileştirilmesi ve TF_CONFIG ortam değişkenine yerleştirilmesi gerekir.

Not defterlerinde ortam değişkenleri ve alt süreçler

Alt işlemler, ortam değişkenlerini üstlerinden devralır. Dolayısıyla, bu jupyter notebook işleminde bir ortam değişkeni ayarlarsanız:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Ortam değişkenine bir alt işlemden erişebilirsiniz:

echo ${GREETINGS}
tutucu12 l10n-yer
Hello TensorFlow!

Sonraki bölümde, TF_CONFIG çalışan alt süreçlerine geçirmek için bunu kullanacaksınız. İşlerinizi asla bu şekilde başlatmazsınız, ancak bu öğreticinin amaçları için yeterlidir: Minimal bir çok işçili örnek göstermek.

Çoklu ÇalışanYansıtmalıStrateji

Modeli eğitmek için, tüm çalışanlar arasında her aygıtta modelin katmanlarındaki tüm değişkenlerin kopyalarını oluşturan tf.distribute.MultiWorkerMirroredStrategy örneğini kullanın. tf.distribute.Strategy kılavuzunda bu strateji hakkında daha fazla ayrıntı bulunmaktadır.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
tutucu14 l10n-yer
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Modelinizi oluştururken bir stratejinin kullanılması gerektiğini belirtmek için tf.distribute.Strategy.scope kullanın. Bu, sizi bu strateji için " çapraz çoğaltma bağlamına " sokar; bu, stratejinin değişken yerleştirme gibi şeylerin kontrolünün altına alındığı anlamına gelir.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Verilerinizi çalışanlar arasında otomatik olarak parçalayın

Çok çalışanlı eğitimde, veri kümesi parçalama mutlaka gerekli değildir, ancak size tam olarak bir kez daha fazla eğitimi daha tekrarlanabilir hale getiren anlambilim sağlar, yani birden fazla çalışanın eğitimi, bir çalışanın eğitimiyle aynı olmalıdır. Not: performans bazı durumlarda etkilenebilir.

Bakınız: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Özel Eğitim Döngüsünü Tanımlayın ve Modeli Eğitin

Bir optimize edici belirtin

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

tf.function ile bir eğitim adımı tanımlayın

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Kontrol noktası kaydetme ve geri yükleme

Özel Eğitim Döngüsündeki denetim noktası uygulaması, kullanıcının bir keras geri araması kullanmak yerine bunu işlemesini gerektirir. Modelin ağırlıklarını kaydetmenize ve tüm modeli kaydetmenize gerek kalmadan geri yüklemenize olanak tanır.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Burada, bir tf.train.CheckpointManager tarafından yönetilen modeli izleyen bir tf.train.Checkpoint oluşturacaksınız, böylece yalnızca en son kontrol noktası korunur.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Şimdi, geri yüklemeniz gerektiğinde, kullanışlı tf.train.latest_checkpoint işlevini kullanarak kaydedilen en son kontrol noktasını bulabilirsiniz.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Kontrol noktasını geri yükledikten sonra özel eğitim döngünüzü eğitmeye devam edebilirsiniz.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
tutucu23 l10n-yer
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

İşçiler üzerinde tam kod kurulumu

Gerçekten MultiWorkerMirroredStrategy ile çalışmak için çalışan süreçleri çalıştırmanız ve onlara bir TF_CONFIG gerekir.

Daha önce yazılmış mnist.py dosyası gibi, işte bu ortak çalışma içinde adım adım incelediğimiz kodun aynısını içeren main.py , her bir çalışanın onu çalıştırması için sadece bir dosyaya yazıyoruz:

Dosya: main.py

Writing main.py

Eğitin ve Değerlendirin

Geçerli dizin artık her iki Python dosyasını da içeriyor:

ls *.py
tutucu27 l10n-yer
main.py
mnist.py

Böylece, TF_CONFIG json-seri hale getirin ve onu ortam değişkenlerine ekleyin:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Şimdi, TF_CONFIG main.py bir çalışan işlemi başlatabilirsiniz:

# first kill any previous runs
%killbgscripts
All background processes were killed.
-yer tutucu31 l10n-yer
python main.py &> job_0.log

Yukarıdaki komutla ilgili dikkat edilmesi gereken birkaç nokta vardır:

  1. Bazı bash komutlarını çalıştırmak için bir dizüstü bilgisayar "sihri" olan %%bash kullanır.
  2. Bu çalışan sonlandırılmayacağından, bash işlemini arka planda çalıştırmak için --bg bayrağını kullanır. Başlamadan önce tüm çalışanları bekler.

Arka planda çalışan işlem, çıktıyı bu not defterine yazdırmaz, bu nedenle &> çıktısını bir dosyaya yönlendirir, böylece ne olduğunu görebilirsiniz.

Bu nedenle, işlemin başlaması için birkaç saniye bekleyin:

import time
time.sleep(20)

Şimdi, işçinin günlük dosyasına şu ana kadar ne çıktı olduğuna bakın:

cat job_0.log
tutucu34 l10n-yer
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Günlük dosyasının son satırı şunları söylemelidir Started server with target: grpc://localhost:12345 . İlk işçi şimdi hazırdır ve diğer tüm işçi(ler)in ilerlemeye hazır olmasını beklemektedir.

Bu nedenle, ikinci çalışanın alması için tf_config güncelleyin:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Şimdi ikinci işçiyi çalıştırın. Bu, tüm çalışanlar aktif olduğu için eğitimi başlatacaktır (bu nedenle bu süreci arka plana atmaya gerek yoktur):

python main.py > /dev/null 2>&1

Şimdi, ilk işçi tarafından yazılan günlükleri tekrar kontrol ederseniz, o modelin eğitimine katıldığını göreceksiniz:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
yer tutucu39 l10n-yer
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Çok çalışanlı derinlemesine eğitim

Bu eğitici, çok çalışanlı kurulumun bir Custom Training Loop iş akışını göstermiştir. Diğer konuların ayrıntılı açıklaması model.fit's guide mevcuttur ve CTL'ler için geçerlidir.

Ayrıca bakınız

  1. TensorFlow'da Dağıtılmış Eğitim kılavuzu, mevcut dağıtım stratejilerine genel bir bakış sağlar.
  2. Birçoğu birden fazla dağıtım stratejisini çalıştıracak şekilde yapılandırılabilen resmi modeller .
  3. Kılavuzdaki Performans bölümü , TensorFlow modellerinizin performansını optimize etmek için kullanabileceğiniz diğer stratejiler ve araçlar hakkında bilgi sağlar.