Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Ringkasan
Tutorial ini menunjukkan cara melakukan pelatihan terdistribusi multi-pekerja dengan model Keras dan Model.fit
API menggunakan tf.distribute.Strategy
API—khususnya kelas tf.distribute.MultiWorkerMirroredStrategy
. Dengan bantuan strategi ini, model Keras yang dirancang untuk berjalan pada satu pekerja dapat bekerja dengan mulus pada banyak pekerja dengan perubahan kode yang minimal.
Bagi mereka yang tertarik untuk memahami lebih dalam tentang tf.distribute.Strategy
API, pelatihan Terdistribusi dalam panduan TensorFlow tersedia untuk ikhtisar tentang strategi distribusi yang didukung TensorFlow.
Untuk mempelajari cara menggunakan MultiWorkerMirroredStrategy
dengan Keras dan loop pelatihan kustom, lihat Loop pelatihan kustom dengan Keras dan MultiWorkerMirroredStrategy .
Perhatikan bahwa tujuan dari tutorial ini adalah untuk mendemonstrasikan contoh multi-pekerja minimal dengan dua pekerja.
Mempersiapkan
Mulailah dengan beberapa impor yang diperlukan:
import json
import os
import sys
Sebelum mengimpor TensorFlow, buat beberapa perubahan pada lingkungan:
- Nonaktifkan semua GPU. Ini mencegah kesalahan yang disebabkan oleh semua pekerja yang mencoba menggunakan GPU yang sama. Dalam aplikasi dunia nyata, setiap pekerja akan berada di mesin yang berbeda.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- Setel ulang variabel lingkungan
TF_CONFIG
(Anda akan mempelajari lebih lanjut tentang ini nanti):
os.environ.pop('TF_CONFIG', None)
- Pastikan direktori saat ini berada di jalur Python—ini memungkinkan notebook untuk mengimpor file yang ditulis oleh
%%writefile
nanti:
if '.' not in sys.path:
sys.path.insert(0, '.')
Sekarang impor TensorFlow:
import tensorflow as tf
Definisi set data dan model
Selanjutnya, buat file mnist_setup.py
dengan model sederhana dan pengaturan dataset. File Python ini akan digunakan oleh proses pekerja dalam tutorial ini:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
Pelatihan model pada pekerja tunggal
Coba latih model untuk sejumlah kecil epoch dan amati hasil dari satu pekerja untuk memastikan semuanya bekerja dengan benar. Saat pelatihan berlangsung, kerugian harus turun dan akurasi harus meningkat.
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
Konfigurasi multi-pekerja
Sekarang mari kita memasuki dunia pelatihan multi-pekerja.
Sebuah cluster dengan pekerjaan dan tugas
Di TensorFlow, pelatihan terdistribusi melibatkan: 'cluster'
dengan beberapa pekerjaan, dan setiap pekerjaan mungkin memiliki satu atau beberapa 'task'
.
Anda akan memerlukan variabel lingkungan konfigurasi TF_CONFIG
untuk pelatihan di beberapa mesin, yang masing-masing mungkin memiliki peran yang berbeda. TF_CONFIG
adalah string JSON yang digunakan untuk menentukan konfigurasi cluster untuk setiap pekerja yang merupakan bagian dari cluster.
Ada dua komponen variabel TF_CONFIG
: 'cluster'
dan 'task'
.
Sebuah
'cluster'
adalah sama untuk semua pekerja dan memberikan informasi tentang cluster pelatihan, yang merupakan dict yang terdiri dari berbagai jenis pekerjaan, seperti'worker'
atau'chief'
.- Dalam pelatihan multi-pekerja dengan
tf.distribute.MultiWorkerMirroredStrategy
, biasanya ada satu'worker'
yang mengambil tanggung jawab, seperti menyimpan pos pemeriksaan dan menulis file ringkasan untuk TensorBoard, selain yang dilakukan'worker'
biasa.'worker'
tersebut disebut sebagai pekerja kepala (dengan nama pekerjaan'chief'
). - Biasanya
'chief'
memiliki'index'
0
yang ditunjuk (pada kenyataannya, ini adalah bagaimanatf.distribute.Strategy
diimplementasikan).
- Dalam pelatihan multi-pekerja dengan
Sebuah
'task'
memberikan informasi tentang tugas saat ini dan berbeda untuk setiap pekerja. Ini menentukan'type'
dan'index'
pekerja itu.
Di bawah ini adalah contoh konfigurasi:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
Ini adalah serial TF_CONFIG
yang sama dengan string JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
Perhatikan bahwa tf_config
hanyalah variabel lokal di Python. Untuk dapat menggunakannya untuk konfigurasi pelatihan, dict ini perlu diserialisasikan sebagai JSON dan ditempatkan di variabel lingkungan TF_CONFIG
.
Pada contoh konfigurasi di atas, Anda mengatur tugas 'type'
ke 'worker'
dan tugas 'index'
ke 0
. Oleh karena itu, mesin ini adalah pekerja pertama . Ia akan ditunjuk sebagai pekerja 'chief'
dan melakukan lebih banyak pekerjaan daripada yang lain.
Untuk tujuan ilustrasi, tutorial ini menunjukkan bagaimana Anda dapat menyiapkan variabel TF_CONFIG
dengan dua pekerja di localhost
.
Dalam praktiknya, Anda akan membuat beberapa pekerja pada alamat/port IP eksternal dan menetapkan variabel TF_CONFIG
pada setiap pekerja yang sesuai.
Dalam tutorial ini, Anda akan menggunakan dua pekerja:
-
TF_CONFIG
pekerja pertama ('chief'
) ditunjukkan di atas. - Untuk pekerja kedua, Anda akan mengatur
tf_config['task']['index']=1
Variabel lingkungan dan subproses di notebook
Subproses mewarisi variabel lingkungan dari induknya.
Misalnya, Anda dapat mengatur variabel lingkungan dalam proses Notebook Jupyter ini sebagai berikut:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
Kemudian, Anda dapat mengakses variabel lingkungan dari subproses:
echo ${GREETINGS}
Hello TensorFlow!
Di bagian berikutnya, Anda akan menggunakan metode serupa untuk meneruskan TF_CONFIG
ke subproses pekerja. Dalam skenario dunia nyata, Anda tidak akan meluncurkan pekerjaan Anda dengan cara ini, tetapi cukup dalam contoh ini.
Pilih strategi yang tepat
Di TensorFlow, ada dua bentuk utama pelatihan terdistribusi:
- Pelatihan sinkron , di mana langkah-langkah pelatihan disinkronkan di seluruh pekerja dan replika, dan
- Pelatihan asinkron , di mana langkah-langkah pelatihan tidak disinkronkan secara ketat (misalnya, pelatihan server parameter ).
Tutorial ini menunjukkan cara melakukan pelatihan multi-pekerja sinkron menggunakan instance tf.distribute.MultiWorkerMirroredStrategy
.
MultiWorkerMirroredStrategy
membuat salinan semua variabel di lapisan model pada setiap perangkat di semua pekerja. Ini menggunakan CollectiveOps
, operasi TensorFlow untuk komunikasi kolektif, untuk menggabungkan gradien dan menjaga agar variabel tetap sinkron. Panduan tf.distribute.Strategy
memiliki rincian lebih lanjut tentang strategi ini.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
menyediakan beberapa implementasi melalui parameter tf.distribute.experimental.CommunicationOptions
: 1) RING
mengimplementasikan kolektif berbasis cincin menggunakan gRPC sebagai lapisan komunikasi lintas host; 2) NCCL
menggunakan Perpustakaan Komunikasi Kolektif NVIDIA untuk mengimplementasikan kolektif; dan 3) AUTO
menunda pilihan ke runtime. Pilihan terbaik untuk implementasi kolektif tergantung pada jumlah dan jenis GPU, dan interkoneksi jaringan dalam cluster.
Latih modelnya
Dengan integrasi tf.distribute.Strategy
API ke tf.keras
, satu-satunya perubahan yang akan Anda buat untuk mendistribusikan pelatihan ke banyak pekerja adalah melampirkan pembuatan model dan panggilan model.compile()
di dalam strategy.scope()
. Cakupan strategi distribusi menentukan bagaimana dan di mana variabel dibuat, dan dalam kasus MultiWorkerMirroredStrategy
, variabel yang dibuat adalah MirroredVariable
s, dan mereka direplikasi pada setiap pekerja.
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
Untuk benar-benar berjalan dengan MultiWorkerMirroredStrategy
Anda harus menjalankan proses pekerja dan meneruskan TF_CONFIG
kepada mereka.
Seperti file mnist_setup.py
yang ditulis sebelumnya, berikut adalah main.py
yang akan dijalankan oleh masing-masing pekerja:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
Dalam cuplikan kode di atas perhatikan bahwa global_batch_size
, yang diteruskan ke Dataset.batch
, disetel ke per_worker_batch_size * num_workers
. Ini memastikan bahwa setiap pekerja memproses kumpulan contoh per_worker_batch_size
terlepas dari jumlah pekerja.
Direktori saat ini sekarang berisi kedua file Python:
ls *.py
main.py mnist_setup.py
Jadi json-serialize TF_CONFIG
dan tambahkan ke variabel lingkungan:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Sekarang, Anda dapat meluncurkan proses pekerja yang akan menjalankan main.py
dan menggunakan TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
Ada beberapa hal yang perlu diperhatikan tentang perintah di atas:
- Ia menggunakan
%%bash
yang merupakan "ajaib" notebook untuk menjalankan beberapa perintah bash. - Ia menggunakan flag
--bg
untuk menjalankan prosesbash
di latar belakang, karena pekerja ini tidak akan berhenti. Itu menunggu semua pekerja sebelum dimulai.
Proses pekerja di latar belakang tidak akan mencetak output ke buku catatan ini, jadi &>
mengalihkan outputnya ke file sehingga Anda dapat memeriksa apa yang terjadi di file log nanti.
Jadi, tunggu beberapa detik hingga proses dimulai:
import time
time.sleep(10)
Sekarang, periksa apa yang telah dihasilkan ke file log pekerja sejauh ini:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Baris terakhir dari file log harus mengatakan: Started server with target: grpc://localhost:12345
. Pekerja pertama sekarang siap, dan sedang menunggu semua pekerja lain siap untuk melanjutkan.
Jadi perbarui tf_config
untuk proses pekerja kedua untuk mengambil:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
Luncurkan pekerja kedua. Ini akan memulai pelatihan karena semua pekerja aktif (jadi proses ini tidak perlu dilatar belakangi):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Jika Anda memeriksa ulang log yang ditulis oleh pekerja pertama, Anda akan mengetahui bahwa ia berpartisipasi dalam pelatihan model tersebut:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
Tidak mengherankan, ini berjalan lebih lambat daripada uji coba di awal tutorial ini.
Menjalankan banyak pekerja pada satu mesin hanya menambah overhead.
Tujuannya di sini bukan untuk meningkatkan waktu pelatihan, tetapi hanya untuk memberikan contoh pelatihan multi-pekerja.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.
Pelatihan multi-pekerja secara mendalam
Sejauh ini, Anda telah mempelajari cara melakukan pengaturan multi-pekerja dasar.
Selama sisa tutorial, Anda akan belajar tentang faktor-faktor lain, yang mungkin berguna atau penting untuk kasus penggunaan nyata, secara rinci.
Pembagian kumpulan data
Dalam pelatihan multi-pekerja, sharding kumpulan data diperlukan untuk memastikan konvergensi dan kinerja.
Contoh di bagian sebelumnya bergantung pada autosharding default yang disediakan oleh tf.distribute.Strategy
API. Anda dapat mengontrol sharding dengan menyetel tf.data.experimental.AutoShardPolicy
dari tf.data.experimental.DistributeOptions
.
Untuk mempelajari lebih lanjut tentang auto-sharding , lihat panduan input terdistribusi .
Berikut adalah contoh cepat cara mematikan sharding otomatis, sehingga setiap replika memproses setiap contoh ( tidak disarankan ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
Evaluasi
Jika Anda meneruskan validation_data
ke Model.fit
, itu akan bergantian antara pelatihan dan evaluasi untuk setiap zaman. Evaluasi yang mengambil validation_data
didistribusikan ke seluruh pekerja yang sama dan hasil evaluasi dikumpulkan dan tersedia untuk semua pekerja.
Serupa dengan pelatihan, dataset validasi secara otomatis di-sharding pada tingkat file. Anda perlu mengatur ukuran batch global dalam dataset validasi dan mengatur validation_steps
.
Kumpulan data berulang juga direkomendasikan untuk evaluasi.
Atau, Anda juga dapat membuat tugas lain yang secara berkala membaca pos pemeriksaan dan menjalankan evaluasi. Inilah yang dilakukan oleh Pengukur. Tapi ini bukan cara yang disarankan untuk melakukan evaluasi dan dengan demikian rinciannya dihilangkan.
Pertunjukan
Anda sekarang memiliki model Keras yang siap dijalankan di banyak pekerja dengan MultiWorkerMirroredStrategy
.
Untuk mengubah kinerja pelatihan multi-pekerja, Anda dapat mencoba yang berikut ini:
tf.distribute.MultiWorkerMirroredStrategy
menyediakan beberapa implementasi komunikasi kolektif :-
RING
mengimplementasikan kolektif berbasis cincin menggunakan gRPC sebagai lapisan komunikasi lintas-host. -
NCCL
menggunakan Perpustakaan Komunikasi Kolektif NVIDIA untuk mengimplementasikan kolektif. -
AUTO
menunda pilihan ke runtime.
Pilihan terbaik untuk implementasi kolektif bergantung pada jumlah GPU, jenis GPU, dan interkoneksi jaringan dalam cluster. Untuk mengganti pilihan otomatis, tentukan parameter
communication_options
dari konstruktorMultiWorkerMirroredStrategy
. Sebagai contoh:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
Keluarkan variabel ke
tf.float
jika memungkinkan:- Model ResNet resmi menyertakan contoh bagaimana hal ini dapat dilakukan.
Toleransi kesalahan
Dalam pelatihan sinkron, cluster akan gagal jika salah satu pekerja gagal dan tidak ada mekanisme pemulihan kegagalan.
Menggunakan Keras dengan tf.distribute.Strategy
hadir dengan keuntungan toleransi kesalahan dalam kasus di mana pekerja mati atau tidak stabil. Anda dapat melakukan ini dengan mempertahankan status pelatihan dalam sistem file terdistribusi pilihan Anda, sehingga saat memulai ulang instans yang sebelumnya gagal atau mendahului, status pelatihan dipulihkan.
Ketika seorang pekerja menjadi tidak tersedia, pekerja lain akan gagal (mungkin setelah batas waktu). Dalam kasus seperti itu, pekerja yang tidak tersedia perlu dimulai ulang, serta pekerja lain yang gagal.
Panggilan balik ModelCheckpoint
Callback ModelCheckpoint
tidak lagi menyediakan fungsionalitas toleransi kesalahan, harap gunakan callback BackupAndRestore
sebagai gantinya.
Callback ModelCheckpoint
masih dapat digunakan untuk menyimpan pos pemeriksaan. Tetapi dengan ini, jika pelatihan terganggu atau berhasil diselesaikan, untuk melanjutkan pelatihan dari pos pemeriksaan, pengguna bertanggung jawab untuk memuat model secara manual.
Secara opsional, pengguna dapat memilih untuk menyimpan dan memulihkan model/bobot di luar panggilan balik ModelCheckpoint
.
Menyimpan dan memuat model
Untuk menyimpan model Anda menggunakan model.save
atau tf.saved_model.save
, tujuan penyimpanan harus berbeda untuk setiap pekerja.
- Untuk pekerja non-kepala, Anda perlu menyimpan model ke direktori sementara.
- Untuk kepala, Anda harus menyimpan ke direktori model yang disediakan.
Direktori sementara pada pekerja harus unik untuk mencegah kesalahan yang diakibatkan oleh banyak pekerja yang mencoba menulis ke lokasi yang sama.
Model yang disimpan di semua direktori identik, dan biasanya hanya model yang disimpan oleh kepala yang harus dirujuk untuk memulihkan atau melayani.
Anda harus memiliki beberapa logika pembersihan yang menghapus direktori sementara yang dibuat oleh pekerja setelah pelatihan Anda selesai.
Alasan untuk menghemat kepala dan pekerja pada saat yang sama adalah karena Anda mungkin menggabungkan variabel selama pos pemeriksaan yang mengharuskan kepala dan pekerja untuk berpartisipasi dalam protokol komunikasi allreduce. Di sisi lain, membiarkan kepala dan pekerja menyimpan ke direktori model yang sama akan menghasilkan kesalahan karena pertengkaran.
Menggunakan MultiWorkerMirroredStrategy
, program dijalankan pada setiap pekerja, dan untuk mengetahui apakah pekerja saat ini adalah kepala, dibutuhkan objek penyelesai klaster yang memiliki atribut task_type
dan task_id
:
-
task_type
memberitahu Anda apa pekerjaan saat ini (misalnya'worker'
). -
task_id
memberi tahu Anda pengidentifikasi pekerja. - Pekerja dengan
task_id == 0
ditunjuk sebagai kepala pekerja.
Dalam cuplikan kode di bawah ini, fungsi write_filepath
menyediakan jalur file untuk ditulis, yang bergantung pada task_id
pekerja:
- Untuk pekerja kepala (dengan
task_id == 0
), ia menulis ke jalur file asli. - Untuk pekerja lain, ini membuat direktori sementara—
temp_dir
—dengantask_id
di jalur direktori untuk menulis:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
Dengan itu, Anda sekarang siap untuk menyimpan:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assets
Seperti dijelaskan di atas, nanti model hanya boleh dimuat dari kepala jalur yang disimpan, jadi mari kita hapus yang sementara yang disimpan oleh pekerja non-kepala:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
Sekarang, ketika saatnya memuat, mari gunakan API tf.keras.models.load_model
yang nyaman, dan lanjutkan dengan pekerjaan lebih lanjut.
Di sini, asumsikan hanya menggunakan pekerja tunggal untuk memuat dan melanjutkan pelatihan, dalam hal ini Anda tidak memanggil tf.keras.models.load_model
dalam strategy.scope()
lain (perhatikan bahwa strategy = tf.distribute.MultiWorkerMirroredStrategy()
, seperti yang didefinisikan sebelumnya ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
Menyimpan dan memulihkan pos pemeriksaan
Di sisi lain, checkpointing memungkinkan Anda untuk menyimpan bobot model Anda dan memulihkannya tanpa harus menyimpan seluruh model.
Di sini, Anda akan membuat satu tf.train.Checkpoint
yang melacak model, yang dikelola oleh tf.train.CheckpointManager
, sehingga hanya pos pemeriksaan terbaru yang dipertahankan:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
Setelah CheckpointManager
diatur, Anda sekarang siap untuk menyimpan dan menghapus pos pemeriksaan yang telah disimpan oleh pekerja non-kepala:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
Sekarang, ketika Anda perlu memulihkan model, Anda dapat menemukan pos pemeriksaan terbaru yang disimpan menggunakan fungsi tf.train.latest_checkpoint
yang nyaman. Setelah memulihkan pos pemeriksaan, Anda dapat melanjutkan dengan pelatihan.
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
Cadangkan dan Pulihkan panggilan balik
Callback tf.keras.callbacks.BackupAndRestore
menyediakan fungsionalitas toleransi kesalahan dengan mencadangkan model dan nomor epoch saat ini dalam file pos pemeriksaan sementara di bawah argumen backup_dir
ke BackupAndRestore
. Ini dilakukan di akhir setiap epoch.
Setelah pekerjaan terputus dan dimulai ulang, panggilan balik mengembalikan pos pemeriksaan terakhir, dan pelatihan berlanjut dari awal zaman yang terputus. Setiap pelatihan parsial yang sudah dilakukan di epoch yang belum selesai sebelum interupsi akan dibuang, sehingga tidak mempengaruhi status model akhir.
Untuk menggunakannya, berikan instance tf.keras.callbacks.BackupAndRestore
pada panggilan Model.fit
.
Dengan MultiWorkerMirroredStrategy
, jika seorang pekerja terganggu, seluruh klaster akan dijeda hingga pekerja yang terganggu dimulai ulang. Pekerja lain juga akan memulai ulang, dan pekerja yang terputus bergabung kembali dengan cluster. Kemudian, setiap pekerja membaca file pos pemeriksaan yang sebelumnya disimpan dan mengambil status sebelumnya, sehingga memungkinkan cluster untuk kembali sinkron. Kemudian, pelatihan berlanjut.
Callback BackupAndRestore
menggunakan CheckpointManager
untuk menyimpan dan memulihkan status pelatihan, yang menghasilkan file bernama checkpoint yang melacak checkpoint yang ada bersama dengan yang terbaru. Untuk alasan ini, backup_dir
tidak boleh digunakan kembali untuk menyimpan pos pemeriksaan lain untuk menghindari tabrakan nama.
Saat ini, callback BackupAndRestore
mendukung pelatihan pekerja tunggal tanpa strategi— MirroredStrategy
—dan pelatihan multi-pekerja dengan MultiWorkerMirroredStrategy
.
Di bawah ini adalah dua contoh untuk pelatihan multi-pekerja dan pelatihan pekerja tunggal:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
Jika Anda memeriksa direktori backup_dir
yang Anda tentukan di BackupAndRestore
, Anda mungkin melihat beberapa file pos pemeriksaan yang dibuat sementara. File tersebut diperlukan untuk memulihkan instance yang sebelumnya hilang, dan file tersebut akan dihapus oleh library di akhir Model.fit
setelah berhasil keluar dari pelatihan Anda.
Sumber daya tambahan
- Pelatihan Terdistribusi dalam panduan TensorFlow memberikan gambaran umum tentang strategi distribusi yang tersedia.
- Loop pelatihan Kustom dengan Keras dan tutorial MultiWorkerMirroredStrategy menunjukkan cara menggunakan
MultiWorkerMirroredStrategy
dengan Keras dan loop pelatihan kustom. - Lihat model resmi , banyak di antaranya dapat dikonfigurasi untuk menjalankan beberapa strategi distribusi.
- Panduan kinerja yang lebih baik dengan tf.function memberikan informasi tentang strategi dan alat lain, seperti TensorFlow Profiler yang dapat Anda gunakan untuk mengoptimalkan kinerja model TensorFlow Anda.