Zobacz na TensorFlow.org | Uruchom w Google Colab | Wyświetl źródło na GitHub | Pobierz notatnik |
Przegląd
W tym samouczku przedstawiono szkolenie dla wielu pracowników przy użyciu niestandardowego interfejsu API pętli szkoleniowej, dystrybuowanego za pośrednictwem MultiWorkerMirroredStrategy, dzięki czemu model Keras zaprojektowany do pracy z jednym pracownikiem może bezproblemowo pracować z wieloma pracownikami przy minimalnej zmianie kodu.
Używamy niestandardowych pętli treningowych do trenowania naszego modelu, ponieważ zapewniają nam elastyczność i większą kontrolę nad treningiem. Co więcej, łatwiej jest debugować model i pętlę treningową. Bardziej szczegółowe informacje znajdziesz w Pisanie pętli treningowej od podstaw .
Jeśli szukasz sposobu korzystania z MultiWorkerMirroredStrategy
z keras model.fit
, zapoznaj się z tym samouczkiem .
Przewodnik dotyczący szkoleń rozproszonych w TensorFlow zawiera przegląd strategii dystrybucji, które TensorFlow obsługuje dla osób zainteresowanych głębszym zrozumieniem interfejsów API tf.distribute.Strategy
.
Ustawiać
Po pierwsze, niektóre niezbędne importy.
import json
import os
import sys
Przed zaimportowaniem TensorFlow wprowadź kilka zmian w środowisku.
Wyłącz wszystkie procesory graficzne. Zapobiega to błędom powodowanym przez wszystkich pracowników próbujących korzystać z tego samego GPU. W przypadku rzeczywistej aplikacji każdy pracownik byłby na innej maszynie.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
Zresetuj zmienną środowiskową TF_CONFIG
, więcej o tym dowiesz się później.
os.environ.pop('TF_CONFIG', None)
Upewnij się, że bieżący katalog znajduje się na ścieżce Pythona. Dzięki temu notatnik może później zaimportować pliki zapisane przez %%writefile
.
if '.' not in sys.path:
sys.path.insert(0, '.')
Teraz zaimportuj TensorFlow.
import tensorflow as tf
Zbiór danych i definicja modelu
Następnie utwórz plik mnist.py
z prostą konfiguracją modelu i zestawu danych. Ten plik Pythona będzie używany przez procesy robocze w tym samouczku:
%%writefile mnist.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the range [0, 255].
# You need to convert them to float32 with values in the range [0, 1]
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000)
return train_dataset
def dataset_fn(global_batch_size, input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = mnist_dataset(batch_size)
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
return dataset
def build_cnn_model():
return tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
Writing mnist.py
Konfiguracja dla wielu pracowników
Teraz wejdźmy w świat szkoleń wieloosobowych. W TensorFlow zmienna środowiskowa TF_CONFIG
jest wymagana do uczenia na wielu komputerach, z których każdy może mieć inną rolę. TF_CONFIG
użyty poniżej to ciąg JSON używany do określenia konfiguracji klastra na każdym procesie roboczym, który jest częścią klastra. Jest to domyślna metoda określania klastra przy użyciu cluster_resolver.TFConfigClusterResolver
, ale w module distribute.cluster_resolver
są dostępne inne opcje.
Opisz swój klaster
Oto przykładowa konfiguracja:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Oto ten sam TF_CONFIG
zserializowany jako ciąg JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Istnieją dwa składniki TF_CONFIG
: cluster
i task
.
cluster
jest taki sam dla wszystkich pracowników i dostarcza informacji o klastrze szkoleniowym, który jest dyktatem składającym się z różnych rodzajów zawodów, takich jakworker
. W szkoleniu dla wielu pracowników zMultiWorkerMirroredStrategy
, zazwyczaj jedenworker
bierze na siebie nieco większą odpowiedzialność, jak zapisywanie punktu kontrolnego i pisanie pliku podsumowującego dla TensorBoard, oprócz tego, co robi zwykływorker
. Taki pracownik jest nazywanychief
pracownikiem, a zwyczajem jest, żeworker
zindex
0 jest mianowany głównymworker
(w rzeczywistości tak realizuje siętf.distribute.Strategy
).task
dostarcza informacji o bieżącym zadaniu i jest inne dla każdego pracownika. Określatype
iindex
tego pracownika.
W tym przykładzie ustawisz type
zadania na "worker"
a index
zadania na 0
. Ta maszyna jest pierwszym pracownikiem i zostanie wyznaczona jako główny pracownik i wykona więcej pracy niż inni. Należy zauważyć, że inne komputery również będą musiały mieć ustawioną zmienną środowiskową TF_CONFIG
, która powinna mieć ten sam dyktat cluster
, ale inny type
zadania lub index
zadania w zależności od ról tych komputerów.
W celach ilustracyjnych, ten samouczek pokazuje, jak można ustawić TF_CONFIG
z 2 procesami roboczymi na localhost
. W praktyce użytkownicy tworzyliby wiele procesów roboczych na zewnętrznych adresach IP/portach i odpowiednio ustawiali TF_CONFIG
na każdym procesie roboczym.
W tym przykładzie użyjesz 2 robotników, TF_CONFIG
pierwszego robotnika pokazano powyżej. Dla drugiego pracownika ustawisz tf_config['task']['index']=1
Powyżej tf_config
jest tylko zmienną lokalną w Pythonie. Aby faktycznie użyć go do skonfigurowania uczenia, ten słownik musi być serializowany jako JSON i umieszczony w zmiennej środowiskowej TF_CONFIG
.
Zmienne środowiskowe i podprocesy w notebookach
Podprocesy dziedziczą zmienne środowiskowe po swoich rodzicach. Więc jeśli ustawisz zmienną środowiskową w tym jupyter notebook
:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Dostęp do zmiennej środowiskowej można uzyskać z podprocesów:
echo ${GREETINGS}
Hello TensorFlow!
W następnej sekcji użyjesz tego do przekazania TF_CONFIG
do podprocesów roboczych. Nigdy nie uruchamiałbyś swoich zadań w ten sposób, ale to wystarczy do celów tego samouczka: aby zademonstrować minimalny przykład wielu pracowników.
MultiWorkerMirroredStrategy
Aby wytrenować model, użyj wystąpienia tf.distribute.MultiWorkerMirroredStrategy
, które tworzy kopie wszystkich zmiennych w warstwach modelu na każdym urządzeniu we wszystkich procesach roboczych. Przewodnik tf.distribute.Strategy
zawiera więcej szczegółów na temat tej strategii.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO 2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Użyj tf.distribute.Strategy.scope
, aby określić, że strategia powinna być używana podczas budowania modelu. To umieszcza Cię w „ kontekście między replikami ” dla tej strategii, co oznacza, że strategia ma kontrolę nad takimi rzeczami, jak zmienne rozmieszczenie.
import mnist
with strategy.scope():
# Model building needs to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
Automatycznie dziel dane między pracownikami
W przypadku szkolenia z udziałem wielu pracowników dzielenie zestawu danych na fragmenty niekoniecznie jest konieczne, jednak zapewnia jednorazową semantykę, dzięki czemu szkolenie jest bardziej powtarzalne, tj. szkolenie na wielu pracownikach powinno być takie samo, jak szkolenie na jednym pracowniku. Uwaga: w niektórych przypadkach może to mieć wpływ na wydajność.
Zobacz: distribute_datasets_from_function
per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
with strategy.scope():
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
Zdefiniuj niestandardową pętlę treningową i trenuj model
Określ optymalizator
with strategy.scope():
# The creation of optimizer and train_accuracy will need to be in
# `strategy.scope()` as well, since they create variables.
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
Zdefiniuj krok szkolenia za pomocą tf.function
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
Zapisywanie i przywracanie punktu kontrolnego
Implementacja punktów kontrolnych w niestandardowej pętli szkoleniowej wymaga od użytkownika obsługi tego zamiast używania wywołania zwrotnego Keras. Pozwala na zapisywanie wag modelu i przywracanie ich bez konieczności zapisywania całego modelu.
from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and "chief" not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
Tutaj utworzysz jeden tf.train.Checkpoint
, który śledzi model, który jest zarządzany przez tf.train.CheckpointManager
, dzięki czemu zachowany zostanie tylko najnowszy punkt kontrolny.
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Teraz, gdy musisz przywrócić, możesz znaleźć najnowszy punkt kontrolny zapisany za pomocą wygodnej funkcji tf.train.latest_checkpoint
.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
Po przywróceniu punktu kontrolnego możesz kontynuować trening własnej pętli treningowej.
num_epochs = 3
num_steps_per_epoch = 70
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
# Once the `CheckpointManager` is set up, you're now ready to save, and remove
# the checkpoints non-chief workers saved.
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.849107, train_loss: 0.491886. Epoch: 1, accuracy: 0.937835, train_loss: 0.197650. Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.
Pełna konfiguracja kodu dla pracowników
Aby faktycznie działać z MultiWorkerMirroredStrategy
, musisz uruchomić procesy robocze i przekazać im TF_CONFIG
.
Podobnie jak napisany wcześniej plik mnist.py
, tutaj jest main.py
, który zawiera ten sam kod, który przeszliśmy krok po kroku wcześniej w tym colab, po prostu zapisujemy go do pliku, aby każdy z pracowników go uruchomił:
Plik: main.py
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist
from multiprocessing import util
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers
num_epochs = 3
num_steps_per_epoch=70
# Checkpoint saving and restoring
def _is_chief(task_type, task_id, cluster_spec):
return (task_type is None
or task_type == 'chief'
or (task_type == 'worker'
and task_id == 0
and 'chief' not in cluster_spec.as_dict()))
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id, cluster_spec):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id, cluster_spec):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')
# Define Strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist.build_cnn_model()
multi_worker_dataset = strategy.distribute_datasets_from_function(
lambda input_context: mnist.dataset_fn(global_batch_size, input_context))
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
name='train_accuracy')
@tf.function
def train_step(iterator):
"""Training step function."""
def step_fn(inputs):
"""Per-Replica step function."""
x, y = inputs
with tf.GradientTape() as tape:
predictions = multi_worker_model(x, training=True)
per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True,
reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
loss = tf.nn.compute_average_loss(
per_batch_loss, global_batch_size=global_batch_size)
grads = tape.gradient(loss, multi_worker_model.trainable_variables)
optimizer.apply_gradients(
zip(grads, multi_worker_model.trainable_variables))
train_accuracy.update_state(y, predictions)
return loss
per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
return strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
initial_value=tf.constant(0, dtype=tf.dtypes.int64),
name='step_in_epoch')
task_type, task_id, cluster_spec = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id,
strategy.cluster_resolver.cluster_spec())
checkpoint = tf.train.Checkpoint(
model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
# Restoring the checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
checkpoint.restore(latest_checkpoint)
# Resume our CTL training
while epoch.numpy() < num_epochs:
iterator = iter(multi_worker_dataset)
total_loss = 0.0
num_batches = 0
while step_in_epoch.numpy() < num_steps_per_epoch:
total_loss += train_step(iterator)
num_batches += 1
step_in_epoch.assign_add(1)
train_loss = total_loss / num_batches
print('Epoch: %d, accuracy: %f, train_loss: %f.'
%(epoch.numpy(), train_accuracy.result(), train_loss))
train_accuracy.reset_states()
checkpoint_manager.save()
if not _is_chief(task_type, task_id, cluster_spec):
tf.io.gfile.rmtree(write_checkpoint_dir)
epoch.assign_add(1)
step_in_epoch.assign(0)
Writing main.py
Trenuj i oceniaj
Bieżący katalog zawiera teraz oba pliki Pythona:
ls *.py
main.py mnist.py
Więc json-serializuj TF_CONFIG
i dodaj go do zmiennych środowiskowych:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Teraz możesz uruchomić proces roboczy, który uruchomi main.py
i użyje TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
W powyższym poleceniu należy zwrócić uwagę na kilka rzeczy:
- Używa
%%bash
, który jest "magią" notatnika do uruchamiania niektórych poleceń basha. - Używa flagi
--bg
do uruchomienia procesubash
w tle, ponieważ ten proces roboczy nie zostanie zakończony. Czeka na wszystkich pracowników, zanim się zacznie.
Proces roboczy działający w tle nie wydrukuje danych wyjściowych do tego notatnika, więc &>
przekierowuje dane wyjściowe do pliku, dzięki czemu można zobaczyć, co się stało.
Poczekaj więc kilka sekund, aż proces się uruchomi:
import time
time.sleep(20)
Teraz spójrz, co do tej pory zostało wyprowadzone do pliku dziennika pracownika:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
Ostatnia linia pliku dziennika powinna Started server with target: grpc://localhost:12345
. Pierwszy pracownik jest teraz gotowy i czeka, aż pozostali pracownicy będą gotowi do pracy.
Zaktualizuj więc tf_config
dla procesu drugiego pracownika, aby pobrać:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Teraz uruchom drugiego robotnika. Rozpocznie to szkolenie, ponieważ wszyscy pracownicy są aktywni (więc nie ma potrzeby wprowadzania tego procesu w tle):
python main.py > /dev/null 2>&1
Teraz, jeśli ponownie sprawdzisz dzienniki zapisane przez pierwszego pracownika, zobaczysz, że uczestniczył on w szkoleniu tego modelu:
cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration 2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch: 0, accuracy: 0.832589, train_loss: 0.531260. Epoch: 1, accuracy: 0.936161, train_loss: 0.214774. Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Dogłębne szkolenie dla wielu pracowników
W tym samouczku zademonstrowano przepływ pracy Custom Training Loop
w konfiguracji wieloosobowej. Szczegółowy opis innych tematów jest dostępny w model.fit's guide
konfiguracji wielu pracowników i ma zastosowanie do CTL.
Zobacz też
- Poradnik dotyczący szkolenia rozproszonego w TensorFlow zawiera przegląd dostępnych strategii dystrybucji.
- Oficjalne modele , z których wiele można skonfigurować do obsługi wielu strategii dystrybucji.
- Sekcja Wydajność w przewodniku zawiera informacje o innych strategiach i narzędziach , których można użyć do optymalizacji wydajności modeli TensorFlow.