Niestandardowa pętla treningowa z Keras i MultiWorkerMirroredStrategy

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

W tym samouczku przedstawiono szkolenie dla wielu pracowników przy użyciu niestandardowego interfejsu API pętli szkoleniowej, dystrybuowanego za pośrednictwem MultiWorkerMirroredStrategy, dzięki czemu model Keras zaprojektowany do pracy z jednym pracownikiem może bezproblemowo pracować z wieloma pracownikami przy minimalnej zmianie kodu.

Używamy niestandardowych pętli treningowych do trenowania naszego modelu, ponieważ zapewniają nam elastyczność i większą kontrolę nad treningiem. Co więcej, łatwiej jest debugować model i pętlę treningową. Bardziej szczegółowe informacje znajdziesz w Pisanie pętli treningowej od podstaw .

Jeśli szukasz sposobu korzystania z MultiWorkerMirroredStrategy z keras model.fit , zapoznaj się z tym samouczkiem .

Przewodnik dotyczący szkoleń rozproszonych w TensorFlow zawiera przegląd strategii dystrybucji, które TensorFlow obsługuje dla osób zainteresowanych głębszym zrozumieniem interfejsów API tf.distribute.Strategy .

Ustawiać

Po pierwsze, niektóre niezbędne importy.

import json
import os
import sys

Przed zaimportowaniem TensorFlow wprowadź kilka zmian w środowisku.

Wyłącz wszystkie procesory graficzne. Zapobiega to błędom powodowanym przez wszystkich pracowników próbujących korzystać z tego samego GPU. W przypadku rzeczywistej aplikacji każdy pracownik byłby na innej maszynie.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Zresetuj zmienną środowiskową TF_CONFIG , więcej o tym dowiesz się później.

os.environ.pop('TF_CONFIG', None)

Upewnij się, że bieżący katalog znajduje się na ścieżce Pythona. Dzięki temu notatnik może później zaimportować pliki zapisane przez %%writefile .

if '.' not in sys.path:
  sys.path.insert(0, '.')

Teraz zaimportuj TensorFlow.

import tensorflow as tf

Zbiór danych i definicja modelu

Następnie utwórz plik mnist.py z prostą konfiguracją modelu i zestawu danych. Ten plik Pythona będzie używany przez procesy robocze w tym samouczku:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Konfiguracja dla wielu pracowników

Teraz wejdźmy w świat szkoleń wieloosobowych. W TensorFlow zmienna środowiskowa TF_CONFIG jest wymagana do uczenia na wielu komputerach, z których każdy może mieć inną rolę. TF_CONFIG użyty poniżej to ciąg JSON używany do określenia konfiguracji klastra na każdym procesie roboczym, który jest częścią klastra. Jest to domyślna metoda określania klastra przy użyciu cluster_resolver.TFConfigClusterResolver , ale w module distribute.cluster_resolver są dostępne inne opcje.

Opisz swój klaster

Oto przykładowa konfiguracja:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Oto ten sam TF_CONFIG zserializowany jako ciąg JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Istnieją dwa składniki TF_CONFIG : cluster i task .

  • cluster jest taki sam dla wszystkich pracowników i dostarcza informacji o klastrze szkoleniowym, który jest dyktatem składającym się z różnych rodzajów zawodów, takich jak worker . W szkoleniu dla wielu pracowników z MultiWorkerMirroredStrategy , zazwyczaj jeden worker bierze na siebie nieco większą odpowiedzialność, jak zapisywanie punktu kontrolnego i pisanie pliku podsumowującego dla TensorBoard, oprócz tego, co robi zwykły worker . Taki pracownik jest nazywany chief pracownikiem, a zwyczajem jest, że worker z index 0 jest mianowany głównym worker (w rzeczywistości tak realizuje się tf.distribute.Strategy ).

  • task dostarcza informacji o bieżącym zadaniu i jest inne dla każdego pracownika. Określa type i index tego pracownika.

W tym przykładzie ustawisz type zadania na "worker" a index zadania na 0 . Ta maszyna jest pierwszym pracownikiem i zostanie wyznaczona jako główny pracownik i wykona więcej pracy niż inni. Należy zauważyć, że inne komputery również będą musiały mieć ustawioną zmienną środowiskową TF_CONFIG , która powinna mieć ten sam dyktat cluster , ale inny type zadania lub index zadania w zależności od ról tych komputerów.

W celach ilustracyjnych, ten samouczek pokazuje, jak można ustawić TF_CONFIG z 2 procesami roboczymi na localhost . W praktyce użytkownicy tworzyliby wiele procesów roboczych na zewnętrznych adresach IP/portach i odpowiednio ustawiali TF_CONFIG na każdym procesie roboczym.

W tym przykładzie użyjesz 2 robotników, TF_CONFIG pierwszego robotnika pokazano powyżej. Dla drugiego pracownika ustawisz tf_config['task']['index']=1

Powyżej tf_config jest tylko zmienną lokalną w Pythonie. Aby faktycznie użyć go do skonfigurowania uczenia, ten słownik musi być serializowany jako JSON i umieszczony w zmiennej środowiskowej TF_CONFIG .

Zmienne środowiskowe i podprocesy w notebookach

Podprocesy dziedziczą zmienne środowiskowe po swoich rodzicach. Więc jeśli ustawisz zmienną środowiskową w tym jupyter notebook :

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Dostęp do zmiennej środowiskowej można uzyskać z podprocesów:

echo ${GREETINGS}
Hello TensorFlow!

W następnej sekcji użyjesz tego do przekazania TF_CONFIG do podprocesów roboczych. Nigdy nie uruchamiałbyś swoich zadań w ten sposób, ale to wystarczy do celów tego samouczka: aby zademonstrować minimalny przykład wielu pracowników.

MultiWorkerMirroredStrategy

Aby wytrenować model, użyj wystąpienia tf.distribute.MultiWorkerMirroredStrategy , które tworzy kopie wszystkich zmiennych w warstwach modelu na każdym urządzeniu we wszystkich procesach roboczych. Przewodnik tf.distribute.Strategy zawiera więcej szczegółów na temat tej strategii.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Użyj tf.distribute.Strategy.scope , aby określić, że strategia powinna być używana podczas budowania modelu. To umieszcza Cię w „ kontekście między replikami ” dla tej strategii, co oznacza, że ​​strategia ma kontrolę nad takimi rzeczami, jak zmienne rozmieszczenie.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Automatycznie dziel dane między pracownikami

W przypadku szkolenia z udziałem wielu pracowników dzielenie zestawu danych na fragmenty niekoniecznie jest konieczne, jednak zapewnia jednorazową semantykę, dzięki czemu szkolenie jest bardziej powtarzalne, tj. szkolenie na wielu pracownikach powinno być takie samo, jak szkolenie na jednym pracowniku. Uwaga: w niektórych przypadkach może to mieć wpływ na wydajność.

Zobacz: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Zdefiniuj niestandardową pętlę treningową i trenuj model

Określ optymalizator

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Zdefiniuj krok szkolenia za pomocą tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Zapisywanie i przywracanie punktu kontrolnego

Implementacja punktów kontrolnych w niestandardowej pętli szkoleniowej wymaga od użytkownika obsługi tego zamiast używania wywołania zwrotnego Keras. Pozwala na zapisywanie wag modelu i przywracanie ich bez konieczności zapisywania całego modelu.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Tutaj utworzysz jeden tf.train.Checkpoint , który śledzi model, który jest zarządzany przez tf.train.CheckpointManager , dzięki czemu zachowany zostanie tylko najnowszy punkt kontrolny.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Teraz, gdy musisz przywrócić, możesz znaleźć najnowszy punkt kontrolny zapisany za pomocą wygodnej funkcji tf.train.latest_checkpoint .

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Po przywróceniu punktu kontrolnego możesz kontynuować trening własnej pętli treningowej.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Pełna konfiguracja kodu dla pracowników

Aby faktycznie działać z MultiWorkerMirroredStrategy , musisz uruchomić procesy robocze i przekazać im TF_CONFIG .

Podobnie jak napisany wcześniej plik mnist.py , tutaj jest main.py , który zawiera ten sam kod, który przeszliśmy krok po kroku wcześniej w tym colab, po prostu zapisujemy go do pliku, aby każdy z pracowników go uruchomił:

Plik: main.py

Writing main.py

Trenuj i oceniaj

Bieżący katalog zawiera teraz oba pliki Pythona:

ls *.py
main.py
mnist.py

Więc json-serializuj TF_CONFIG i dodaj go do zmiennych środowiskowych:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz możesz uruchomić proces roboczy, który uruchomi main.py i użyje TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

W powyższym poleceniu należy zwrócić uwagę na kilka rzeczy:

  1. Używa %%bash , który jest "magią" notatnika do uruchamiania niektórych poleceń basha.
  2. Używa flagi --bg do uruchomienia procesu bash w tle, ponieważ ten proces roboczy nie zostanie zakończony. Czeka na wszystkich pracowników, zanim się zacznie.

Proces roboczy działający w tle nie wydrukuje danych wyjściowych do tego notatnika, więc &> przekierowuje dane wyjściowe do pliku, dzięki czemu można zobaczyć, co się stało.

Poczekaj więc kilka sekund, aż proces się uruchomi:

import time
time.sleep(20)

Teraz spójrz, co do tej pory zostało wyprowadzone do pliku dziennika pracownika:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Ostatnia linia pliku dziennika powinna Started server with target: grpc://localhost:12345 . Pierwszy pracownik jest teraz gotowy i czeka, aż pozostali pracownicy będą gotowi do pracy.

Zaktualizuj więc tf_config dla procesu drugiego pracownika, aby pobrać:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Teraz uruchom drugiego robotnika. Rozpocznie to szkolenie, ponieważ wszyscy pracownicy są aktywni (więc nie ma potrzeby wprowadzania tego procesu w tle):

python main.py > /dev/null 2>&1

Teraz, jeśli ponownie sprawdzisz dzienniki zapisane przez pierwszego pracownika, zobaczysz, że uczestniczył on w szkoleniu tego modelu:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Dogłębne szkolenie dla wielu pracowników

W tym samouczku zademonstrowano przepływ pracy Custom Training Loop w konfiguracji wieloosobowej. Szczegółowy opis innych tematów jest dostępny w model.fit's guide konfiguracji wielu pracowników i ma zastosowanie do CTL.

Zobacz też

  1. Poradnik dotyczący szkolenia rozproszonego w TensorFlow zawiera przegląd dostępnych strategii dystrybucji.
  2. Oficjalne modele , z których wiele można skonfigurować do obsługi wielu strategii dystrybucji.
  3. Sekcja Wydajność w przewodniku zawiera informacje o innych strategiach i narzędziach , których można użyć do optymalizacji wydajności modeli TensorFlow.