Zobacz na TensorFlow.org | Uruchom w Google Colab | Wyświetl źródło na GitHub | Pobierz notatnik |
Przegląd
Uczenie serwera parametrów to powszechna metoda równoległości danych służąca do skalowania uczenia modelu w górę na wielu komputerach.
Klaster uczący serwera parametrów składa się z procesów roboczych i serwerów parametrów . Zmienne są tworzone na serwerach parametrów i na każdym kroku są odczytywane i aktualizowane przez pracowników. Domyślnie pracownicy odczytują i aktualizują te zmienne niezależnie bez wzajemnej synchronizacji. Dlatego czasami uczenie parametrów w stylu serwera jest nazywane uczeniem asynchronicznym .
W TensorFlow 2 uczenie serwera parametrów jest obsługiwane przez klasę tf.distribute.experimental.ParameterServerStrategy
, która dystrybuuje kroki uczenia do klastra skalowanego do tysięcy pracowników (wraz z serwerami parametrów).
Obsługiwane metody szkoleniowe
Istnieją dwie główne wspierane metody szkoleniowe:
- Keras
Model.fit
API, który jest zalecany, gdy preferujesz abstrakcję wysokiego poziomu i obsługę szkolenia. - Niestandardowa pętla treningowa (więcej szczegółów można znaleźć w sekcji Trening niestandardowy , Pisanie pętli treningowej od podstaw i Niestandardowa pętla treningowa za pomocą Keras i MultiWorkerMirroredStrategy ). Trening w niestandardowej pętli jest zalecany, gdy wolisz zdefiniować szczegóły ich pętli treningowych.
Klaster z zadaniami i zadaniami
Niezależnie od wybranego interfejsu API ( Model.fit
lub niestandardowa pętla szkoleniowa), szkolenie rozproszone w TensorFlow 2 obejmuje: 'cluster'
z kilkoma 'jobs'
, a każde z zadań może mieć jedno lub więcej 'tasks'
.
W przypadku korzystania z uczenia serwera parametrów zaleca się posiadanie:
- Jedno stanowisko koordynatora (które ma stanowisko
chief
) - Wiele miejsc pracy (nazwa stanowiska
worker
); oraz - Zadania serwera z wieloma parametrami (nazwa zadania
ps
)
Podczas gdy koordynator tworzy zasoby, rozsyła zadania szkoleniowe, zapisuje punkty kontrolne i zajmuje się niepowodzeniem zadań, procesy robocze i serwery parametrów uruchamiają tf.distribute.Server
, który nasłuchuje żądań od koordynatora.
Trening parametrów serwera za pomocą Model.fit
API
Uczenie serwera parametrów za pomocą interfejsu API Model.fit
wymaga od koordynatora użycia obiektu tf.distribute.experimental.ParameterServerStrategy
oraz obiektu tf.keras.utils.experimental.DatasetCreator
jako danych wejściowych. Podobnie jak w przypadku użycia Model.fit
bez strategii lub z innymi strategiami, przepływ pracy obejmuje tworzenie i kompilację modelu, przygotowanie wywołań zwrotnych, a następnie wywołanie Model.fit
.
Trening parametrów serwera z niestandardową pętlą treningową
W przypadku niestandardowych pętli szkoleniowych klasa tf.distribute.experimental.coordinator.ClusterCoordinator
jest kluczowym komponentem używanym przez koordynatora.
- Klasa
ClusterCoordinator
musi działać w połączeniu z obiektemtf.distribute.Strategy
. - Ten obiekt
tf.distribute.Strategy
jest potrzebny do dostarczania informacji o klastrze i służy do definiowania kroku szkolenia, jak pokazano w Szkoleniu niestandardowym za pomocą tf.distribute.Strategy . - Następnie obiekt
ClusterCoordinator
wysyła wykonanie tych kroków szkoleniowych do pracowników zdalnych. - W przypadku uczenia serwera parametrów
ClusterCoordinator
musi współpracować ztf.distribute.experimental.ParameterServerStrategy
.
Najważniejszym API udostępnianym przez obiekt ClusterCoordinator
jest schedule
:
-
schedule
API umieszcza w kolejce funkcjętf.function
i natychmiast zwraca wartośćRemoteValue
podobną do przyszłości. - Funkcje umieszczone w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a ich
RemoteValue
zostaną wypełnione asynchronicznie. - Ponieważ
schedule
nie wymaga przypisania pracownika, przekazanatf.function
może zostać wykonana na dowolnym dostępnym pracowniku. - Jeśli pracownik, na którym jest wykonywana, stanie się niedostępny przed jej zakończeniem, funkcja zostanie ponowiona na innym dostępnym pracowniku.
- Ze względu na ten fakt oraz fakt, że wykonanie funkcji nie jest niepodzielne, funkcja może być wykonana więcej niż raz.
Oprócz rozsyłania funkcji zdalnych ClusterCoordinator
pomaga również w tworzeniu zestawów danych dotyczących wszystkich pracowników i odbudowie tych zestawów danych, gdy pracownik odzyskuje siły po awarii.
Konfiguracja samouczka
Samouczek rozgałęzia się na Model.fit
i niestandardowe ścieżki pętli treningowych, a Ty możesz wybrać tę, która odpowiada Twoim potrzebom. Sekcje inne niż „Trening z X” mają zastosowanie do obu ścieżek.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Konfiguracja klastra
Jak wspomniano powyżej, klaster szkoleniowy serwera parametrów wymaga zadania koordynatora, które uruchamia program szkoleniowy, jednego lub kilku procesów roboczych i zadań serwera parametrów, które uruchamiają serwery TensorFlow — tf.distribute.Server
— i ewentualnie dodatkowego zadania ewaluacyjnego, które uruchamia ewaluację boczną (patrz sekcja dotycząca oceny wózka bocznego poniżej). Wymagania do ich założenia to:
- Zadanie koordynatora musi znać adresy i porty wszystkich innych serwerów TensorFlow z wyjątkiem oceniającego.
- Pracownicy i serwery parametrów muszą wiedzieć, na którym porcie mają nasłuchiwać. Dla uproszczenia zazwyczaj można przekazać pełne informacje o klastrze podczas tworzenia serwerów TensorFlow do tych zadań.
- Zadanie oceniającego nie musi znać konfiguracji klastra szkoleniowego. Jeśli tak, nie powinien próbować nawiązać połączenia z klastrem szkoleniowym.
- Procesy robocze i serwery parametrów powinny mieć typy zadań odpowiednio jako
"worker"
i"ps"
. Koordynator powinien używać"chief"
jako typu zadania ze względów historycznych.
W tym samouczku utworzysz klaster wewnątrzprocesowy, aby całe szkolenie dotyczące serwera parametrów można było uruchomić w Colab. W dalszej części dowiesz się, jak skonfigurować prawdziwe klastry .
Klaster w procesie
Zaczniesz od utworzenia kilku serwerów TensorFlow z wyprzedzeniem i połączysz się z nimi później. Zwróć uwagę, że jest to tylko do celów demonstracyjnych tego samouczka, aw prawdziwym szkoleniu serwery będą uruchamiane na maszynach "worker"
i "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
Konfiguracja klastra w procesie jest często używana w testach jednostkowych, takich jak tutaj .
Inną opcją testowania lokalnego jest uruchamianie procesów na komputerze lokalnym — zobacz Szkolenie dla wielu pracowników z Keras , aby zapoznać się z przykładem takiego podejścia.
Utwórz wystąpienie ParameterServerStrategy
Zanim zagłębisz się w kod szkoleniowy, stwórzmy wystąpienie obiektu ParameterServerStrategy
. Pamiętaj, że jest to potrzebne niezależnie od tego, czy korzystasz z Model.fit
, czy z niestandardową pętlą treningową. Argument variable_partitioner
zostanie wyjaśniony w sekcji Variable sharding .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Aby używać procesorów GPU do szkolenia, przydziel procesory GPU widoczne dla każdego pracownika. ParameterServerStrategy
użyje wszystkich dostępnych procesorów GPU na każdym procesie roboczym, z zastrzeżeniem, że wszyscy pracownicy powinni mieć taką samą liczbę dostępnych procesorów GPU.
Zmienna sharding
Fragmentacja zmiennych odnosi się do dzielenia zmiennej na wiele mniejszych zmiennych, które są nazywane shards . Fragmentowanie zmiennych może być przydatne do rozłożenia obciążenia sieci podczas uzyskiwania dostępu do tych fragmentów. Przydatne jest również rozdzielenie obliczeń i przechowywania normalnej zmiennej na wiele serwerów parametrów.
Aby włączyć dzielenie na fragmenty zmiennych, można przekazać variable_partitioner
podczas konstruowania obiektu ParameterServerStrategy
. variable_partitioner
będzie wywoływany za każdym razem, gdy tworzona jest zmienna i oczekuje się, że zwróci liczbę fragmentów wzdłuż każdego wymiaru zmiennej. Niektóre out-of-box variable_partitioner
s są dostarczane, takie jak tf.distribute.experimental.partitioners.MinSizePartitioner
. Zaleca się używanie partycjonatorów opartych na rozmiarze, takich jak tf.distribute.experimental.partitioners.MinSizePartitioner
, aby uniknąć partycjonowania małych zmiennych, które mogą mieć negatywny wpływ na szybkość uczenia modelu.
Po przekazaniu variable_partitioner
i jeśli utworzysz zmienną bezpośrednio w strategy.scope()
, stanie się ona typem kontenera z właściwością variables
, która zapewnia dostęp do listy fragmentów. W większości przypadków ten kontener zostanie automatycznie przekonwertowany na tensor poprzez połączenie wszystkich odłamków. W rezultacie może być używana jako normalna zmienna. Z drugiej strony, niektóre metody TensorFlow, takie jak tf.nn.embedding_lookup
, zapewniają wydajną implementację dla tego typu kontenera i w tych metodach można uniknąć automatycznego łączenia.
Więcej informacji można znaleźć w dokumentacji interfejsu API tf.distribute.experimental.ParameterServerStrategy
.
Trening z Model.fit
Keras zapewnia łatwy w użyciu interfejs API treningowy za pośrednictwem Model.fit
, który obsługuje pętlę treningową pod maską, z elastycznością nadpisywania train_step
i wywołaniami zwrotnymi, które zapewniają takie funkcje, jak zapisywanie punktów kontrolnych lub zapisywanie podsumowań dla TensorBoard. Dzięki Model.fit
ten sam kod szkoleniowy może być używany dla innych strategii z prostą zamianą obiektu strategii.
Dane wejściowe
Model.fit
z uczeniem serwera parametrów wymaga, aby dane wejściowe były dostarczane w wywoływanej, która przyjmuje pojedynczy argument typu tf.distribute.InputContext
i zwraca tf.data.Dataset
. Następnie utwórz obiekt tf.keras.utils.experimental.DatasetCreator
, który przyjmuje taki obiekt callable
i opcjonalny obiekt tf.distribute.InputOptions
za pośrednictwem argumentu input_options
.
Zwróć uwagę, że zaleca się tasowanie i powtarzanie danych przy uczeniu serwera parametrów i określanie steps_per_epoch
w wywołaniu fit
, aby biblioteka znała granice epoki.
Zapoznaj się z samouczkiem dotyczącym wprowadzania rozproszonego , aby uzyskać więcej informacji na temat argumentu InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
Kod w dataset_fn
zostanie wywołany na urządzeniu wejściowym, którym zwykle jest procesor, na każdym z komputerów roboczych.
Budowa i kompilacja modeli
Teraz utworzysz tf.keras.Model
— trywialny model tf.keras.models.Sequential
do celów demonstracyjnych — po którym nastąpi wywołanie Model.compile
w celu włączenia komponentów, takich jak optymalizator, metryki lub parametry, takie jak steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Oddzwonienia i szkolenia
Zanim wywołasz model.fit
do rzeczywistego szkolenia, przygotujmy potrzebne wywołania zwrotne dla typowych zadań, takich jak:
-
ModelCheckpoint
: aby zapisać wagi modelu. -
BackupAndRestore
: aby upewnić się, że postęp szkolenia jest automatycznie zapisywany w kopii zapasowej i odzyskiwany w przypadku niedostępności klastra (np. przerwanie lub wywłaszczenie); lub -
TensorBoard
: do zapisywania raportów postępu w plikach podsumowujących, które są wizualizowane w narzędziu TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Bezpośrednie użycie z ClusterCoordinator
(opcjonalnie)
Nawet jeśli wybierzesz ścieżkę szkoleniową Model.fit
, możesz opcjonalnie utworzyć wystąpienie obiektu tf.distribute.experimental.coordinator.ClusterCoordinator
, aby zaplanować inne funkcje, które chcesz wykonać na pracownikach. Zobacz sekcję Trening z niestandardową pętlą treningową, aby uzyskać więcej szczegółów i przykładów.
Trening z niestandardową pętlą treningową
Używanie niestandardowych pętli treningowych z tf.distribute.Strategy
zapewnia dużą elastyczność w definiowaniu pętli treningowych. Z ParameterServerStrategy
zdefiniowanym powyżej (jako strategy
), użyjesz tf.distribute.experimental.coordinator.ClusterCoordinator
, aby wysłać wykonanie kroków szkoleniowych do pracowników zdalnych.
Następnie utworzysz model, zdefiniujesz zbiór danych i funkcję kroku, tak jak to zrobiłeś w pętli szkoleniowej z innymi tf.distribute.Strategy
. Więcej szczegółów można znaleźć w samouczku tf.distribute.Strategy .
Aby zapewnić wydajne pobieranie zestawu danych z wyprzedzeniem, użyj zalecanych interfejsów API tworzenia rozproszonych zestawów danych wymienionych w poniższej sekcji Kroki szkolenia dotyczące wysyłania do pracowników zdalnych . Pamiętaj też, aby wywołać Strategy.run
wewnątrz worker_fn
, aby w pełni wykorzystać procesory GPU przydzielone pracownikom. Pozostałe kroki są takie same w przypadku treningu z procesorami graficznymi lub bez nich.
Stwórzmy te komponenty w następujących krokach:
Skonfiguruj dane
Najpierw napisz funkcję, która tworzy zestaw danych zawierający logikę przetwarzania wstępnego zaimplementowaną przez warstwy przetwarzania wstępnego Keras .
Utworzysz te warstwy poza dataset_fn
, ale zastosujesz przekształcenie wewnątrz dataset_fn
, ponieważ zapakujesz dataset_fn
w tf.function
, który nie pozwala na tworzenie w nim zmiennych.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Wygeneruj przykłady zabawek w zbiorze danych:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Następnie utwórz treningowy zestaw danych opakowany w dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Zbuduj model
Następnie utwórz model i inne obiekty. Pamiętaj, aby utworzyć wszystkie zmienne w strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Potwierdźmy, że użycie FixedShardsPartitioner
podzieliło wszystkie zmienne na dwa shardy, a każdy shard został przypisany do różnych serwerów parametrów:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Zdefiniuj etap szkolenia
Po trzecie, utwórz krok treningowy opakowany w tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
W powyższej funkcji szkolenia step, wywoływanie Strategy.run
i Strategy.reduce
w step_fn
może obsługiwać wiele procesorów GPU na pracownika. Jeśli pracownicy mają przydzielone procesory graficzne, Strategy.run
rozprowadzi zestawy danych w wielu replikach.
Wyślij kroki szkoleniowe do pracowników zdalnych
Po zdefiniowaniu wszystkich obliczeń przez ParameterServerStrategy
użyjesz klasy tf.distribute.experimental.coordinator.ClusterCoordinator
do tworzenia zasobów i dystrybucji kroków szkolenia do pracowników zdalnych.
Utwórzmy najpierw obiekt ClusterCoordinator
i przekażmy obiekt strategii:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Następnie utwórz zestaw danych na pracownika i iterator. W instrukcji per_worker_dataset_fn
poniżej zaleca się umieszczenie zestawu dataset_fn
w strategy.distribute_datasets_from_function
, aby umożliwić bezproblemowe pobieranie z wyprzedzeniem do procesorów graficznych.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Ostatnim krokiem jest dystrybucja obliczeń do pracowników zdalnych za pomocą ClusterCoordinator.schedule
:
- Metoda
schedule
umieszcza wtf.function
i natychmiast zwraca wartośćRemoteValue
podobną do przyszłości. Funkcje umieszczone w kolejce zostaną wysłane do pracowników zdalnych w wątkach w tle, a wartośćRemoteValue
zostanie wypełniona asynchronicznie. - Metoda
join
(ClusterCoordinator.join
) może służyć do oczekiwania na wykonanie wszystkich zaplanowanych funkcji.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Oto jak możesz pobrać wynik RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
Alternatywnie możesz uruchomić wszystkie kroki i zrobić coś podczas oczekiwania na zakończenie:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Aby zapoznać się z pełnym przebiegiem szkolenia i udostępniania dla tego konkretnego przykładu, zapoznaj się z tym testem .
Więcej o tworzeniu zbiorów danych
Zestaw danych w powyższym kodzie jest tworzony przy użyciu interfejsu API ClusterCoordinator.create_per_worker_dataset
). Tworzy jeden zestaw danych na pracownika i zwraca obiekt kontenera. Możesz wywołać na nim metodę iter
, aby utworzyć iterator na proces roboczy. Iterator na pracownika zawiera jeden iterator na pracownika, a odpowiedni wycinek pracownika zostanie zastąpiony w argumencie wejściowym funkcji przekazanej do metody ClusterCoordinator.schedule
przed wykonaniem funkcji na określonym procesie roboczym.
Obecnie metoda ClusterCoordinator.schedule
zakłada, że procesy robocze są równoważne, a zatem zakłada, że zestawy danych na różnych procesach roboczych są takie same, z wyjątkiem tego, że mogą one być przetasowane w inny sposób, jeśli zawierają operację Dataset.shuffle
. Z tego powodu zaleca się również powtarzanie zestawów danych w nieskończoność i zaplanowanie skończonej liczby kroków zamiast polegania na OutOfRangeError
z zestawu danych.
Inną ważną uwagą jest to, że zestawy danych tf.data
nie obsługują niejawnej serializacji i deserializacji między granicami zadań. Dlatego ważne jest, aby utworzyć cały zestaw danych wewnątrz funkcji przekazanej do ClusterCoordinator.create_per_worker_dataset
.
Ocena
Istnieje więcej niż jeden sposób zdefiniowania i uruchomienia pętli oceny w szkoleniu rozproszonym. Każdy ma swoje zalety i wady, jak opisano poniżej. Metoda oceny inline jest zalecana, jeśli nie masz preferencji.
Ocena inline
W tej metodzie koordynator naprzemiennie używa szkolenia i ewaluacji i dlatego nazywa się to ewaluacją inline .
Istnieje kilka korzyści z oceny wbudowanej. Na przykład:
- Może obsługiwać duże modele ewaluacyjne i zestawy danych ewaluacyjnych, których nie może pomieścić pojedyncze zadanie.
- Wyniki ewaluacji mogą posłużyć do podejmowania decyzji dotyczących szkolenia w następnej epoce.
Istnieją dwa sposoby wdrażania ewaluacji inline: ewaluacja bezpośrednia i ewaluacja rozproszona.
- Ocena bezpośrednia : w przypadku małych modeli i zestawów danych ewaluacyjnych koordynator może przeprowadzić ewaluację bezpośrednio w modelu rozproszonym z zestawem danych ewaluacyjnych na koordynatorze:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Ocena rozproszona : w przypadku dużych modeli lub zestawów danych, których uruchomienie bezpośrednio na koordynatorze jest niemożliwe, zadanie koordynatora może rozdzielić zadania oceny do pracowników za pomocą metod
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Ocena samochodu bocznego
Inna metoda nazywana jest oceną samochodu bocznego, w której tworzysz dedykowane zadanie oceniające, które wielokrotnie odczytuje punkty kontrolne i uruchamia ocenę ostatniego punktu kontrolnego. Pozwala to na wcześniejsze zakończenie programu treningowego, jeśli nie musisz zmieniać pętli treningowej na podstawie wyników oceny. Wymaga to jednak dodatkowego zadania ewaluatora i okresowego sprawdzania punktów kontrolnych, aby wyzwolić ocenę. Poniżej znajduje się możliwa pętla oceny wózka bocznego:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Klastry w świecie rzeczywistym
W rzeczywistym środowisku produkcyjnym uruchomisz wszystkie zadania w różnych procesach na różnych maszynach. Najprostszym sposobem skonfigurowania informacji o klastrze dla każdego zadania jest ustawienie zmiennych środowiskowych "TF_CONFIG"
i użycie tf.distribute.cluster_resolver.TFConfigClusterResolver
do przeanalizowania "TF_CONFIG"
.
Aby uzyskać ogólny opis zmiennych środowiskowych "TF_CONFIG"
, zapoznaj się z przewodnikiem dotyczącym szkoleń rozproszonych .
Jeśli rozpoczynasz zadania szkoleniowe przy użyciu Kubernetes lub innych szablonów konfiguracji, jest bardzo prawdopodobne, że te szablony mają już ustawione dla Ciebie “TF_CONFIG"
.
Ustaw zmienną środowiskową "TF_CONFIG"
Załóżmy, że masz 3 procesy robocze i 2 serwery parametrów, "TF_CONFIG"
1 może mieć postać:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
"TF_CONFIG"
oceniającego może być:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
Część "cluster"
w powyższym "TF_CONFIG"
dla oceniającego jest opcjonalna.
Jeśli używasz tego samego pliku binarnego do wszystkich zadań
Jeśli wolisz uruchamiać wszystkie te zadania za pomocą jednego pliku binarnego, na samym początku musisz pozwolić swojemu programowi rozgałęziać się na różne role:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
Poniższy kod uruchamia serwer TensorFlow i czeka:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Obsługa niepowodzenia zadania
Awaria pracownika
tf.distribute.experimental.coordinator.ClusterCoordinator
lub Model.fit
zapewniają wbudowaną odporność na błędy w przypadku awarii pracownika. Po odzyskaniu elementu roboczego wcześniej udostępniona funkcja zestawu danych (albo ClusterCoordinator.create_per_worker_dataset
w przypadku niestandardowej pętli szkoleniowej, albo tf.keras.utils.experimental.DatasetCreator
dla Model.fit
) zostanie wywołana na elementach roboczych w celu ponownego utworzenia zestawów danych.
Awaria serwera parametrów lub koordynatora
Jednak gdy koordynator zobaczy błąd serwera parametrów, natychmiast zgłosi błąd UnavailableError
lub AbortedError
. W takim przypadku możesz ponownie uruchomić koordynatora. Sam koordynator również może stać się niedostępny. Dlatego zalecane jest odpowiednie oprzyrządowanie, aby nie stracić postępów w treningu:
W przypadku
Model.fit
należy użyć wywołania zwrotnegoBackupAndRestore
, które automatycznie obsługuje zapisywanie i przywracanie postępów. Zobacz na przykład sekcję Callbacks i szkolenia powyżej.W przypadku niestandardowej pętli szkoleniowej należy okresowo sprawdzać zmienne modelu i załadować zmienne modelu z punktu kontrolnego, jeśli istnieje, przed rozpoczęciem szkolenia. Postęp szkolenia można wywnioskować w przybliżeniu na podstawie pliku
optimizer.iterations
, jeśli optymalizator jest w punkcie kontrolnym:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Pobieranie wartości RemoteValue
Pobranie wartości RemoteValue
ma gwarancję powodzenia, jeśli funkcja zostanie pomyślnie wykonana. Dzieje się tak, ponieważ obecnie wartość zwracana jest natychmiast kopiowana do koordynatora po wykonaniu funkcji. Jeśli podczas kopiowania wystąpi awaria pracownika, funkcja zostanie ponowiona na innym dostępnym pracowniku. Dlatego jeśli chcesz zoptymalizować wydajność, możesz zaplanować funkcje bez zwracanej wartości.
Zgłaszanie błędów
Gdy koordynator zobaczy błąd, taki jak UnavailableError
z serwerów parametrów lub inne błędy aplikacji, takie jak InvalidArgument
z tf.debugging.check_numerics
, anuluje wszystkie oczekujące i umieszczone w kolejce funkcje przed zgłoszeniem błędu. Pobranie odpowiadających im RemoteValue
spowoduje zgłoszenie CancelledError
.
Po zgłoszeniu błędu koordynator nie zgłosi tego samego błędu ani żadnego błędu z anulowanych funkcji.
Poprawa wydajności
Istnieje kilka możliwych przyczyn wystąpienia problemów z wydajnością podczas trenowania za pomocą ParameterServerStrategy
i ClusterResolver
.
Jedną z częstych przyczyn jest to, że serwery parametrów mają niezrównoważone obciążenie, a niektóre mocno obciążone serwery parametrów osiągnęły pojemność. Przyczyn może być również wiele. Niektóre proste metody złagodzenia tego problemu to:
- Podziel na fragmenty duże zmienne modelu, określając
variable_partitioner
podczas konstruowaniaParameterServerStrategy
. - Jeśli to możliwe, unikaj tworzenia zmiennej hotspotu wymaganej przez wszystkie serwery parametrów w jednym kroku. Na przykład użyj stałej szybkości uczenia się lub podklasy
tf.keras.optimizers.schedules.LearningRateSchedule
w optymalizatorach, ponieważ domyślne zachowanie polega na tym, że szybkość uczenia się stanie się zmienną umieszczaną na określonym serwerze parametrów i żądaną przez wszystkie inne serwery parametrów w każdym kroku . - Potasuj swoje duże słowniki przed przekazaniem ich do warstw przetwarzania wstępnego Keras.
Inną możliwą przyczyną problemów z wydajnością jest koordynator. Twoja pierwsza implementacja schedule
/ join
jest oparta na Pythonie i dlatego może mieć narzut związany z wątkami. Również opóźnienie między koordynatorem a pracownikami może być duże. Jeżeli o to chodzi,
Dla
Model.fit
można ustawić argumentsteps_per_execution
dostarczony wModel.compile
na wartość większą niż 1.W przypadku niestandardowej pętli treningowej możesz spakować wiele kroków w jedną
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Ponieważ biblioteka jest dalej optymalizowana, miejmy nadzieję, że większość użytkowników nie będzie musiała ręcznie pakować kroków w przyszłości.
Ponadto małą sztuczką w celu poprawy wydajności jest zaplanowanie funkcji bez wartości zwracanej, jak wyjaśniono w sekcji dotyczącej niepowodzenia zadania obsługi powyżej.
Znane ograniczenia
Większość znanych ograniczeń została już omówiona w powyższych sekcjach. Ta sekcja zawiera podsumowanie.
ParameterServerStrategy
ogólne
-
os.environment["grpc_fail_fast"]="use_caller"
jest potrzebny w każdym zadaniu, w tym koordynatorze, aby odporność na awarie działała prawidłowo. - Uczenie serwera parametrów synchronicznych nie jest obsługiwane.
- Zwykle konieczne jest spakowanie wielu kroków w jedną funkcję, aby osiągnąć optymalną wydajność.
- Nie jest obsługiwane ładowanie save_model za pośrednictwem
tf.saved_model.load
zawierającego zmienne podzielone na fragmenty. Uwaga: ładowanie takiego zapisanego modelu przy użyciu TensorFlow Serving powinno działać. - Nie jest obsługiwane ładowanie punktu kontrolnego zawierającego zmienne slotu optymalizatora podzielonego na fragmenty do innej liczby fragmentów.
- Nie jest obsługiwane odzyskiwanie po awarii serwera parametrów bez restartowania zadania koordynatora.
- Użycie
tf.lookup.StaticHashTable
(który jest powszechnie używany przez niektóre warstwy przetwarzania wstępnego Keras, takie jaktf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
itf.keras.layers.TextVectorization
) powoduje umieszczenie zasobów na koordynator w tym czasie ze szkoleniem serwera parametrów. Ma to wpływ na wydajność wyszukiwania RPC od pracowników do koordynatora. Jest to obecnie wysoki priorytet do rozwiązania.
Specyfika Model.fit
- W
Model.fit
wymagany jest argumentsteps_per_epoch
. Możesz wybrać wartość, która zapewnia odpowiednie odstępy w epoce. -
ParameterServerStrategy
nie obsługuje niestandardowych wywołań zwrotnych, które mają wywołania na poziomie wsadowym ze względu na wydajność. Powinieneś przekonwertować te wywołania na wywołania na poziomie epoki z odpowiedniosteps_per_epoch
, tak aby były nazywane każdą liczbą krokówsteps_per_epoch
. Nie ma to wpływu na wbudowane wywołania zwrotne: ich wywołania na poziomie wsadu zostały zmodyfikowane tak, aby były wydajne. Planowana jest obsługa wywołań na poziomie partii dlaParameterServerStrategy
. - Z tego samego powodu, w przeciwieństwie do innych strategii, pasek postępu i metryki są rejestrowane tylko na granicach epok.
-
run_eagerly
nie jest obsługiwany.
Specyfika niestandardowej pętli treningowej
-
ClusterCoordinator.schedule
nie obsługuje gwarancji odwiedzin dla zbioru danych. - Gdy używany jest
ClusterCoordinator.create_per_worker_dataset
, cały zestaw danych musi zostać utworzony wewnątrz przekazanej do niego funkcji. -
tf.data.Options
jest ignorowany w zestawie danych utworzonym przezClusterCoordinator.create_per_worker_dataset
.