Trening serwera parametrów za pomocą ParameterServerStrategy

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Przegląd

Uczenie serwera parametrów to powszechna metoda równoległości danych służąca do skalowania uczenia modelu w górę na wielu komputerach.

Klaster uczący serwera parametrów składa się z procesów roboczych i serwerów parametrów . Zmienne są tworzone na serwerach parametrów i na każdym kroku są odczytywane i aktualizowane przez pracowników. Domyślnie pracownicy odczytują i aktualizują te zmienne niezależnie bez wzajemnej synchronizacji. Dlatego czasami uczenie parametrów w stylu serwera jest nazywane uczeniem asynchronicznym .

W TensorFlow 2 uczenie serwera parametrów jest obsługiwane przez klasę tf.distribute.experimental.ParameterServerStrategy , która dystrybuuje kroki uczenia do klastra skalowanego do tysięcy pracowników (wraz z serwerami parametrów).

Obsługiwane metody szkoleniowe

Istnieją dwie główne wspierane metody szkoleniowe:

Klaster z zadaniami i zadaniami

Niezależnie od wybranego interfejsu API ( Model.fit lub niestandardowa pętla szkoleniowa), szkolenie rozproszone w TensorFlow 2 obejmuje: 'cluster' z kilkoma 'jobs' , a każde z zadań może mieć jedno lub więcej 'tasks' .

W przypadku korzystania z uczenia serwera parametrów zaleca się posiadanie:

  • Jedno stanowisko koordynatora (które ma stanowisko chief )
  • Wiele miejsc pracy (nazwa stanowiska worker ); oraz
  • Zadania serwera z wieloma parametrami (nazwa zadania ps )

Podczas gdy koordynator tworzy zasoby, rozsyła zadania szkoleniowe, zapisuje punkty kontrolne i zajmuje się niepowodzeniem zadań, procesy robocze i serwery parametrów uruchamiają tf.distribute.Server , który nasłuchuje żądań od koordynatora.

Trening parametrów serwera za pomocą Model.fit API

Uczenie serwera parametrów za pomocą interfejsu API Model.fit wymaga od koordynatora użycia obiektu tf.distribute.experimental.ParameterServerStrategy oraz obiektu tf.keras.utils.experimental.DatasetCreator jako danych wejściowych. Podobnie jak w przypadku użycia Model.fit bez strategii lub z innymi strategiami, przepływ pracy obejmuje tworzenie i kompilację modelu, przygotowanie wywołań zwrotnych, a następnie wywołanie Model.fit .

Trening parametrów serwera z niestandardową pętlą treningową

W przypadku niestandardowych pętli szkoleniowych klasa tf.distribute.experimental.coordinator.ClusterCoordinator jest kluczowym komponentem używanym przez koordynatora.

Najważniejszym API udostępnianym przez obiekt ClusterCoordinator jest schedule :

  • schedule API umieszcza w kolejce funkcję tf.function i natychmiast zwraca wartość RemoteValue podobną do przyszłości.
  • Funkcje umieszczone w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a ich RemoteValue zostaną wypełnione asynchronicznie.
  • Ponieważ schedule nie wymaga przypisania pracownika, przekazana tf.function może zostać wykonana na dowolnym dostępnym pracowniku.
  • Jeśli pracownik, na którym jest wykonywana, stanie się niedostępny przed jej zakończeniem, funkcja zostanie ponowiona na innym dostępnym pracowniku.
  • Ze względu na ten fakt oraz fakt, że wykonanie funkcji nie jest niepodzielne, funkcja może być wykonana więcej niż raz.

Oprócz rozsyłania funkcji zdalnych ClusterCoordinator pomaga również w tworzeniu zestawów danych dotyczących wszystkich pracowników i odbudowie tych zestawów danych, gdy pracownik odzyskuje siły po awarii.

Konfiguracja samouczka

Samouczek rozgałęzia się na Model.fit i niestandardowe ścieżki pętli treningowych, a Ty możesz wybrać tę, która odpowiada Twoim potrzebom. Sekcje inne niż „Trening z X” mają zastosowanie do obu ścieżek.

pip install portpicker

Konfiguracja klastra

Jak wspomniano powyżej, klaster szkoleniowy serwera parametrów wymaga zadania koordynatora, które uruchamia program szkoleniowy, jednego lub kilku procesów roboczych i zadań serwera parametrów, które uruchamiają serwery TensorFlow — tf.distribute.Server — i ewentualnie dodatkowego zadania ewaluacyjnego, które uruchamia ewaluację boczną (patrz sekcja dotycząca oceny wózka bocznego poniżej). Wymagania do ich założenia to:

  • Zadanie koordynatora musi znać adresy i porty wszystkich innych serwerów TensorFlow z wyjątkiem oceniającego.
  • Pracownicy i serwery parametrów muszą wiedzieć, na którym porcie mają nasłuchiwać. Dla uproszczenia zazwyczaj można przekazać pełne informacje o klastrze podczas tworzenia serwerów TensorFlow do tych zadań.
  • Zadanie oceniającego nie musi znać konfiguracji klastra szkoleniowego. Jeśli tak, nie powinien próbować nawiązać połączenia z klastrem szkoleniowym.
  • Procesy robocze i serwery parametrów powinny mieć typy zadań odpowiednio jako "worker" i "ps" . Koordynator powinien używać "chief" jako typu zadania ze względów historycznych.

W tym samouczku utworzysz klaster wewnątrzprocesowy, aby całe szkolenie dotyczące serwera parametrów można było uruchomić w Colab. W dalszej części dowiesz się, jak skonfigurować prawdziwe klastry .

Klaster w procesie

Zaczniesz od utworzenia kilku serwerów TensorFlow z wyprzedzeniem i połączysz się z nimi później. Zwróć uwagę, że jest to tylko do celów demonstracyjnych tego samouczka, aw prawdziwym szkoleniu serwery będą uruchamiane na maszynach "worker" i "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Konfiguracja klastra w procesie jest często używana w testach jednostkowych, takich jak tutaj .

Inną opcją testowania lokalnego jest uruchamianie procesów na komputerze lokalnym — zobacz Szkolenie dla wielu pracowników z Keras , aby zapoznać się z przykładem takiego podejścia.

Utwórz wystąpienie ParameterServerStrategy

Zanim zagłębisz się w kod szkoleniowy, stwórzmy wystąpienie obiektu ParameterServerStrategy . Pamiętaj, że jest to potrzebne niezależnie od tego, czy korzystasz z Model.fit , czy z niestandardową pętlą treningową. Argument variable_partitioner zostanie wyjaśniony w sekcji Variable sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Aby używać procesorów GPU do szkolenia, przydziel procesory GPU widoczne dla każdego pracownika. ParameterServerStrategy użyje wszystkich dostępnych procesorów GPU na każdym procesie roboczym, z zastrzeżeniem, że wszyscy pracownicy powinni mieć taką samą liczbę dostępnych procesorów GPU.

Zmienna sharding

Fragmentacja zmiennych odnosi się do dzielenia zmiennej na wiele mniejszych zmiennych, które są nazywane shards . Fragmentowanie zmiennych może być przydatne do rozłożenia obciążenia sieci podczas uzyskiwania dostępu do tych fragmentów. Przydatne jest również rozdzielenie obliczeń i przechowywania normalnej zmiennej na wiele serwerów parametrów.

Aby włączyć dzielenie na fragmenty zmiennych, można przekazać variable_partitioner podczas konstruowania obiektu ParameterServerStrategy . variable_partitioner będzie wywoływany za każdym razem, gdy tworzona jest zmienna i oczekuje się, że zwróci liczbę fragmentów wzdłuż każdego wymiaru zmiennej. Niektóre out-of-box variable_partitioner s są dostarczane, takie jak tf.distribute.experimental.partitioners.MinSizePartitioner . Zaleca się używanie partycjonatorów opartych na rozmiarze, takich jak tf.distribute.experimental.partitioners.MinSizePartitioner , aby uniknąć partycjonowania małych zmiennych, które mogą mieć negatywny wpływ na szybkość uczenia modelu.

Po przekazaniu variable_partitioner i jeśli utworzysz zmienną bezpośrednio w strategy.scope() , stanie się ona typem kontenera z właściwością variables , która zapewnia dostęp do listy fragmentów. W większości przypadków ten kontener zostanie automatycznie przekonwertowany na tensor poprzez połączenie wszystkich odłamków. W rezultacie może być używana jako normalna zmienna. Z drugiej strony, niektóre metody TensorFlow, takie jak tf.nn.embedding_lookup , zapewniają wydajną implementację dla tego typu kontenera i w tych metodach można uniknąć automatycznego łączenia.

Więcej informacji można znaleźć w dokumentacji interfejsu API tf.distribute.experimental.ParameterServerStrategy .

Trening z Model.fit

Keras zapewnia łatwy w użyciu interfejs API treningowy za pośrednictwem Model.fit , który obsługuje pętlę treningową pod maską, z elastycznością nadpisywania train_step i wywołaniami zwrotnymi, które zapewniają takie funkcje, jak zapisywanie punktów kontrolnych lub zapisywanie podsumowań dla TensorBoard. Dzięki Model.fit ten sam kod szkoleniowy może być używany dla innych strategii z prostą zamianą obiektu strategii.

Dane wejściowe

Model.fit z uczeniem serwera parametrów wymaga, aby dane wejściowe były dostarczane w wywoływanej, która przyjmuje pojedynczy argument typu tf.distribute.InputContext i zwraca tf.data.Dataset . Następnie utwórz obiekt tf.keras.utils.experimental.DatasetCreator , który przyjmuje taki obiekt callable i opcjonalny obiekt tf.distribute.InputOptions za pośrednictwem argumentu input_options .

Zwróć uwagę, że zaleca się tasowanie i powtarzanie danych przy uczeniu serwera parametrów i określanie steps_per_epoch w wywołaniu fit , aby biblioteka znała granice epoki.

Zapoznaj się z samouczkiem dotyczącym wprowadzania rozproszonego , aby uzyskać więcej informacji na temat argumentu InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Kod w dataset_fn zostanie wywołany na urządzeniu wejściowym, którym zwykle jest procesor, na każdym z komputerów roboczych.

Budowa i kompilacja modeli

Teraz utworzysz tf.keras.Model — trywialny model tf.keras.models.Sequential do celów demonstracyjnych — po którym nastąpi wywołanie Model.compile w celu włączenia komponentów, takich jak optymalizator, metryki lub parametry, takie jak steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Oddzwonienia i szkolenia

Zanim wywołasz model.fit do rzeczywistego szkolenia, przygotujmy potrzebne wywołania zwrotne dla typowych zadań, takich jak:

  • ModelCheckpoint : aby zapisać wagi modelu.
  • BackupAndRestore : aby upewnić się, że postęp szkolenia jest automatycznie zapisywany w kopii zapasowej i odzyskiwany w przypadku niedostępności klastra (np. przerwanie lub wywłaszczenie); lub
  • TensorBoard : do zapisywania raportów postępu w plikach podsumowujących, które są wizualizowane w narzędziu TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Bezpośrednie użycie z ClusterCoordinator (opcjonalnie)

Nawet jeśli wybierzesz ścieżkę szkoleniową Model.fit , możesz opcjonalnie utworzyć wystąpienie obiektu tf.distribute.experimental.coordinator.ClusterCoordinator , aby zaplanować inne funkcje, które chcesz wykonać na pracownikach. Zobacz sekcję Trening z niestandardową pętlą treningową, aby uzyskać więcej szczegółów i przykładów.

Trening z niestandardową pętlą treningową

Używanie niestandardowych pętli treningowych z tf.distribute.Strategy zapewnia dużą elastyczność w definiowaniu pętli treningowych. Z ParameterServerStrategy zdefiniowanym powyżej (jako strategy ), użyjesz tf.distribute.experimental.coordinator.ClusterCoordinator , aby wysłać wykonanie kroków szkoleniowych do pracowników zdalnych.

Następnie utworzysz model, zdefiniujesz zbiór danych i funkcję kroku, tak jak to zrobiłeś w pętli szkoleniowej z innymi tf.distribute.Strategy . Więcej szczegółów można znaleźć w samouczku tf.distribute.Strategy .

Aby zapewnić wydajne pobieranie zestawu danych z wyprzedzeniem, użyj zalecanych interfejsów API tworzenia rozproszonych zestawów danych wymienionych w poniższej sekcji Kroki szkolenia dotyczące wysyłania do pracowników zdalnych . Pamiętaj też, aby wywołać Strategy.run wewnątrz worker_fn , aby w pełni wykorzystać procesory GPU przydzielone pracownikom. Pozostałe kroki są takie same w przypadku treningu z procesorami graficznymi lub bez nich.

Stwórzmy te komponenty w następujących krokach:

Skonfiguruj dane

Najpierw napisz funkcję, która tworzy zestaw danych zawierający logikę przetwarzania wstępnego zaimplementowaną przez warstwy przetwarzania wstępnego Keras .

Utworzysz te warstwy poza dataset_fn , ale zastosujesz przekształcenie wewnątrz dataset_fn , ponieważ zapakujesz dataset_fn w tf.function , który nie pozwala na tworzenie w nim zmiennych.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Wygeneruj przykłady zabawek w zbiorze danych:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Następnie utwórz treningowy zestaw danych opakowany w dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Zbuduj model

Następnie utwórz model i inne obiekty. Pamiętaj, aby utworzyć wszystkie zmienne w strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Potwierdźmy, że użycie FixedShardsPartitioner podzieliło wszystkie zmienne na dwa shardy, a każdy shard został przypisany do różnych serwerów parametrów:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Zdefiniuj etap szkolenia

Po trzecie, utwórz krok treningowy opakowany w tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

W powyższej funkcji szkolenia step, wywoływanie Strategy.run i Strategy.reduce w step_fn może obsługiwać wiele procesorów GPU na pracownika. Jeśli pracownicy mają przydzielone procesory graficzne, Strategy.run rozprowadzi zestawy danych w wielu replikach.

Wyślij kroki szkoleniowe do pracowników zdalnych

Po zdefiniowaniu wszystkich obliczeń przez ParameterServerStrategy użyjesz klasy tf.distribute.experimental.coordinator.ClusterCoordinator do tworzenia zasobów i dystrybucji kroków szkolenia do pracowników zdalnych.

Utwórzmy najpierw obiekt ClusterCoordinator i przekażmy obiekt strategii:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Następnie utwórz zestaw danych na pracownika i iterator. W instrukcji per_worker_dataset_fn poniżej zaleca się umieszczenie zestawu dataset_fn w strategy.distribute_datasets_from_function , aby umożliwić bezproblemowe pobieranie z wyprzedzeniem do procesorów graficznych.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Ostatnim krokiem jest dystrybucja obliczeń do pracowników zdalnych za pomocą ClusterCoordinator.schedule :

  • Metoda schedule umieszcza w tf.function i natychmiast zwraca wartość RemoteValue podobną do przyszłości. Funkcje umieszczone w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a wartość RemoteValue zostanie wypełniona asynchronicznie.
  • Metoda join ( ClusterCoordinator.join ) może służyć do oczekiwania na wykonanie wszystkich zaplanowanych funkcji.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Oto jak możesz pobrać wynik RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Alternatywnie możesz uruchomić wszystkie kroki i zrobić coś podczas oczekiwania na zakończenie:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Aby zapoznać się z pełnym przebiegiem szkolenia i udostępniania dla tego konkretnego przykładu, zapoznaj się z tym testem .

Więcej o tworzeniu zbiorów danych

Zestaw danych w powyższym kodzie jest tworzony przy użyciu interfejsu API ClusterCoordinator.create_per_worker_dataset ). Tworzy jeden zestaw danych na pracownika i zwraca obiekt kontenera. Możesz wywołać na nim metodę iter , aby utworzyć iterator na proces roboczy. Iterator na pracownika zawiera jeden iterator na pracownika, a odpowiedni wycinek pracownika zostanie zastąpiony w argumencie wejściowym funkcji przekazanej do metody ClusterCoordinator.schedule przed wykonaniem funkcji na określonym procesie roboczym.

Obecnie metoda ClusterCoordinator.schedule zakłada, że ​​procesy robocze są równoważne, a zatem zakłada, że ​​zestawy danych na różnych procesach roboczych są takie same, z wyjątkiem tego, że mogą one być przetasowane w inny sposób, jeśli zawierają operację Dataset.shuffle . Z tego powodu zaleca się również powtarzanie zestawów danych w nieskończoność i zaplanowanie skończonej liczby kroków zamiast polegania na OutOfRangeError z zestawu danych.

Inną ważną uwagą jest to, że zestawy danych tf.data nie obsługują niejawnej serializacji i deserializacji między granicami zadań. Dlatego ważne jest, aby utworzyć cały zestaw danych wewnątrz funkcji przekazanej do ClusterCoordinator.create_per_worker_dataset .

Ocena

Istnieje więcej niż jeden sposób zdefiniowania i uruchomienia pętli oceny w szkoleniu rozproszonym. Każdy ma swoje zalety i wady, jak opisano poniżej. Metoda oceny inline jest zalecana, jeśli nie masz preferencji.

Ocena inline

W tej metodzie koordynator naprzemiennie używa szkolenia i ewaluacji i dlatego nazywa się to ewaluacją inline .

Istnieje kilka korzyści z oceny wbudowanej. Na przykład:

  • Może obsługiwać duże modele ewaluacyjne i zestawy danych ewaluacyjnych, których nie może pomieścić pojedyncze zadanie.
  • Wyniki ewaluacji mogą posłużyć do podejmowania decyzji dotyczących szkolenia w następnej epoce.

Istnieją dwa sposoby wdrażania ewaluacji inline: ewaluacja bezpośrednia i ewaluacja rozproszona.

  • Ocena bezpośrednia : w przypadku małych modeli i zestawów danych ewaluacyjnych koordynator może przeprowadzić ewaluację bezpośrednio w modelu rozproszonym z zestawem danych ewaluacyjnych na koordynatorze:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Ocena rozproszona : w przypadku dużych modeli lub zestawów danych, których uruchomienie bezpośrednio na koordynatorze jest niemożliwe, zadanie koordynatora może rozdzielić zadania oceny do pracowników za pomocą metod ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Ocena samochodu bocznego

Inna metoda nazywana jest oceną samochodu bocznego, w której tworzysz dedykowane zadanie oceniające, które wielokrotnie odczytuje punkty kontrolne i uruchamia ocenę ostatniego punktu kontrolnego. Pozwala to na wcześniejsze zakończenie programu treningowego, jeśli nie musisz zmieniać pętli treningowej na podstawie wyników oceny. Wymaga to jednak dodatkowego zadania ewaluatora i okresowego sprawdzania punktów kontrolnych, aby wyzwolić ocenę. Poniżej znajduje się możliwa pętla oceny wózka bocznego:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Klastry w świecie rzeczywistym

W rzeczywistym środowisku produkcyjnym uruchomisz wszystkie zadania w różnych procesach na różnych maszynach. Najprostszym sposobem skonfigurowania informacji o klastrze dla każdego zadania jest ustawienie zmiennych środowiskowych "TF_CONFIG" i użycie tf.distribute.cluster_resolver.TFConfigClusterResolver do przeanalizowania "TF_CONFIG" .

Aby uzyskać ogólny opis zmiennych środowiskowych "TF_CONFIG" , zapoznaj się z przewodnikiem dotyczącym szkoleń rozproszonych .

Jeśli rozpoczynasz zadania szkoleniowe przy użyciu Kubernetes lub innych szablonów konfiguracji, jest bardzo prawdopodobne, że te szablony mają już ustawione dla Ciebie “TF_CONFIG" .

Ustaw zmienną środowiskową "TF_CONFIG"

Załóżmy, że masz 3 procesy robocze i 2 serwery parametrów, "TF_CONFIG" 1 może mieć postać:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" oceniającego może być:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Część "cluster" w powyższym "TF_CONFIG" dla oceniającego jest opcjonalna.

Jeśli używasz tego samego pliku binarnego do wszystkich zadań

Jeśli wolisz uruchamiać wszystkie te zadania za pomocą jednego pliku binarnego, na samym początku musisz pozwolić swojemu programowi rozgałęziać się na różne role:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Poniższy kod uruchamia serwer TensorFlow i czeka:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Obsługa niepowodzenia zadania

Awaria pracownika

tf.distribute.experimental.coordinator.ClusterCoordinator lub Model.fit zapewniają wbudowaną odporność na błędy w przypadku awarii pracownika. Po odzyskaniu elementu roboczego wcześniej udostępniona funkcja zestawu danych (albo ClusterCoordinator.create_per_worker_dataset w przypadku niestandardowej pętli szkoleniowej, albo tf.keras.utils.experimental.DatasetCreator dla Model.fit ) zostanie wywołana na elementach roboczych w celu ponownego utworzenia zestawów danych.

Awaria serwera parametrów lub koordynatora

Jednak gdy koordynator zobaczy błąd serwera parametrów, natychmiast zgłosi błąd UnavailableError lub AbortedError . W takim przypadku możesz ponownie uruchomić koordynatora. Sam koordynator również może stać się niedostępny. Dlatego zalecane jest odpowiednie oprzyrządowanie, aby nie stracić postępów w treningu:

  • W przypadku Model.fit należy użyć wywołania zwrotnego BackupAndRestore , które automatycznie obsługuje zapisywanie i przywracanie postępów. Zobacz na przykład sekcję Callbacks i szkolenia powyżej.

  • W przypadku niestandardowej pętli szkoleniowej należy okresowo sprawdzać zmienne modelu i załadować zmienne modelu z punktu kontrolnego, jeśli istnieje, przed rozpoczęciem szkolenia. Postęp szkolenia można wywnioskować w przybliżeniu na podstawie pliku optimizer.iterations , jeśli optymalizator jest w punkcie kontrolnym:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Pobieranie wartości RemoteValue

Pobranie wartości RemoteValue ma gwarancję powodzenia, jeśli funkcja zostanie pomyślnie wykonana. Dzieje się tak, ponieważ obecnie wartość zwracana jest natychmiast kopiowana do koordynatora po wykonaniu funkcji. Jeśli podczas kopiowania wystąpi awaria pracownika, funkcja zostanie ponowiona na innym dostępnym pracowniku. Dlatego jeśli chcesz zoptymalizować wydajność, możesz zaplanować funkcje bez zwracanej wartości.

Zgłaszanie błędów

Gdy koordynator zobaczy błąd, taki jak UnavailableError z serwerów parametrów lub inne błędy aplikacji, takie jak InvalidArgument z tf.debugging.check_numerics , anuluje wszystkie oczekujące i umieszczone w kolejce funkcje przed zgłoszeniem błędu. Pobranie odpowiadających im RemoteValue spowoduje zgłoszenie CancelledError .

Po zgłoszeniu błędu koordynator nie zgłosi tego samego błędu ani żadnego błędu z anulowanych funkcji.

Poprawa wydajności

Istnieje kilka możliwych przyczyn wystąpienia problemów z wydajnością podczas trenowania za pomocą ParameterServerStrategy i ClusterResolver .

Jedną z częstych przyczyn jest to, że serwery parametrów mają niezrównoważone obciążenie, a niektóre mocno obciążone serwery parametrów osiągnęły pojemność. Przyczyn może być również wiele. Niektóre proste metody złagodzenia tego problemu to:

  1. Podziel na fragmenty duże zmienne modelu, określając variable_partitioner podczas konstruowania ParameterServerStrategy .
  2. Jeśli to możliwe, unikaj tworzenia zmiennej hotspotu wymaganej przez wszystkie serwery parametrów w jednym kroku. Na przykład użyj stałej szybkości uczenia się lub podklasy tf.keras.optimizers.schedules.LearningRateSchedule w optymalizatorach, ponieważ domyślne zachowanie polega na tym, że szybkość uczenia się stanie się zmienną umieszczaną na określonym serwerze parametrów i żądaną przez wszystkie inne serwery parametrów w każdym kroku .
  3. Potasuj swoje duże słowniki przed przekazaniem ich do warstw przetwarzania wstępnego Keras.

Inną możliwą przyczyną problemów z wydajnością jest koordynator. Twoja pierwsza implementacja schedule / join jest oparta na Pythonie i dlatego może mieć narzut związany z wątkami. Również opóźnienie między koordynatorem a pracownikami może być duże. Jeżeli o to chodzi,

  • Dla Model.fit można ustawić argument steps_per_execution dostarczony w Model.compile na wartość większą niż 1.

  • W przypadku niestandardowej pętli treningowej możesz spakować wiele kroków w jedną tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Ponieważ biblioteka jest dalej optymalizowana, miejmy nadzieję, że większość użytkowników nie będzie musiała ręcznie pakować kroków w przyszłości.

Ponadto małą sztuczką w celu poprawy wydajności jest zaplanowanie funkcji bez wartości zwracanej, jak wyjaśniono w sekcji dotyczącej niepowodzenia zadania obsługi powyżej.

Znane ograniczenia

Większość znanych ograniczeń została już omówiona w powyższych sekcjach. Ta sekcja zawiera podsumowanie.

ParameterServerStrategy ogólne

  • os.environment["grpc_fail_fast"]="use_caller" jest potrzebny w każdym zadaniu, w tym koordynatorze, aby odporność na awarie działała prawidłowo.
  • Uczenie serwera parametrów synchronicznych nie jest obsługiwane.
  • Zwykle konieczne jest spakowanie wielu kroków w jedną funkcję, aby osiągnąć optymalną wydajność.
  • Nie jest obsługiwane ładowanie save_model za pośrednictwem tf.saved_model.load zawierającego zmienne podzielone na fragmenty. Uwaga: ładowanie takiego zapisanego modelu przy użyciu TensorFlow Serving powinno działać.
  • Nie jest obsługiwane ładowanie punktu kontrolnego zawierającego zmienne slotu optymalizatora podzielonego na fragmenty do innej liczby fragmentów.
  • Nie jest obsługiwane odzyskiwanie po awarii serwera parametrów bez restartowania zadania koordynatora.
  • Użycie tf.lookup.StaticHashTable (który jest powszechnie używany przez niektóre warstwy przetwarzania wstępnego Keras, takie jak tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup i tf.keras.layers.TextVectorization ) powoduje umieszczenie zasobów na koordynator w tym czasie ze szkoleniem serwera parametrów. Ma to wpływ na wydajność wyszukiwania RPC od pracowników do koordynatora. Jest to obecnie wysoki priorytet do rozwiązania.

Specyfika Model.fit

  • W Model.fit wymagany jest argument steps_per_epoch . Możesz wybrać wartość, która zapewnia odpowiednie odstępy w epoce.
  • ParameterServerStrategy nie obsługuje niestandardowych wywołań zwrotnych, które mają wywołania na poziomie wsadowym ze względu na wydajność. Powinieneś przekonwertować te wywołania na wywołania na poziomie epoki z odpowiednio steps_per_epoch , tak aby były nazywane każdą liczbą kroków steps_per_epoch . Nie ma to wpływu na wbudowane wywołania zwrotne: ich wywołania na poziomie wsadu zostały zmodyfikowane tak, aby były wydajne. Planowana jest obsługa wywołań na poziomie partii dla ParameterServerStrategy .
  • Z tego samego powodu, w przeciwieństwie do innych strategii, pasek postępu i metryki są rejestrowane tylko na granicach epok.
  • run_eagerly nie jest obsługiwany.

Specyfika niestandardowej pętli treningowej