Pelatihan multi-pekerja dengan Keras

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

Ringkasan

Tutorial ini menunjukkan cara melakukan pelatihan terdistribusi multi-pekerja dengan model Keras dan Model.fit API menggunakan tf.distribute.Strategy API—khususnya kelas tf.distribute.MultiWorkerMirroredStrategy . Dengan bantuan strategi ini, model Keras yang dirancang untuk berjalan pada satu pekerja dapat bekerja dengan mulus pada banyak pekerja dengan perubahan kode yang minimal.

Bagi mereka yang tertarik untuk memahami lebih dalam tentang tf.distribute.Strategy API, pelatihan Terdistribusi dalam panduan TensorFlow tersedia untuk ikhtisar tentang strategi distribusi yang didukung TensorFlow.

Untuk mempelajari cara menggunakan MultiWorkerMirroredStrategy dengan Keras dan loop pelatihan kustom, lihat Loop pelatihan kustom dengan Keras dan MultiWorkerMirroredStrategy .

Perhatikan bahwa tujuan dari tutorial ini adalah untuk mendemonstrasikan contoh multi-pekerja minimal dengan dua pekerja.

Mempersiapkan

Mulailah dengan beberapa impor yang diperlukan:

import json
import os
import sys

Sebelum mengimpor TensorFlow, buat beberapa perubahan pada lingkungan:

  1. Nonaktifkan semua GPU. Ini mencegah kesalahan yang disebabkan oleh semua pekerja yang mencoba menggunakan GPU yang sama. Dalam aplikasi dunia nyata, setiap pekerja akan berada di mesin yang berbeda.
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  1. Setel ulang variabel lingkungan TF_CONFIG (Anda akan mempelajari lebih lanjut tentang ini nanti):
os.environ.pop('TF_CONFIG', None)
  1. Pastikan direktori saat ini berada di jalur Python—ini memungkinkan notebook untuk mengimpor file yang ditulis oleh %%writefile nanti:
if '.' not in sys.path:
  sys.path.insert(0, '.')

Sekarang impor TensorFlow:

import tensorflow as tf

Definisi set data dan model

Selanjutnya, buat file mnist_setup.py dengan model sederhana dan pengaturan dataset. File Python ini akan digunakan oleh proses pekerja dalam tutorial ini:

%%writefile mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_cnn_model():
  model = tf.keras.Sequential([
      tf.keras.layers.InputLayer(input_shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
      metrics=['accuracy'])
  return model
Writing mnist_setup.py

Pelatihan model pada pekerja tunggal

Coba latih model untuk sejumlah kecil epoch dan amati hasil dari satu pekerja untuk memastikan semuanya bekerja dengan benar. Saat pelatihan berlangsung, kerugian harus turun dan akurasi harus meningkat.

import mnist_setup

batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 0s 0us/step
11501568/11490434 [==============================] - 0s 0us/step
2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
Epoch 1/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788
Epoch 2/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185
Epoch 3/3
70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795
<keras.callbacks.History at 0x7f666a2e4510>

Konfigurasi multi-pekerja

Sekarang mari kita memasuki dunia pelatihan multi-pekerja.

Sebuah cluster dengan pekerjaan dan tugas

Di TensorFlow, pelatihan terdistribusi melibatkan: 'cluster' dengan beberapa pekerjaan, dan setiap pekerjaan mungkin memiliki satu atau beberapa 'task' .

Anda akan memerlukan variabel lingkungan konfigurasi TF_CONFIG untuk pelatihan di beberapa mesin, yang masing-masing mungkin memiliki peran yang berbeda. TF_CONFIG adalah string JSON yang digunakan untuk menentukan konfigurasi cluster untuk setiap pekerja yang merupakan bagian dari cluster.

Ada dua komponen variabel TF_CONFIG : 'cluster' dan 'task' .

  • Sebuah 'cluster' adalah sama untuk semua pekerja dan memberikan informasi tentang cluster pelatihan, yang merupakan dict yang terdiri dari berbagai jenis pekerjaan, seperti 'worker' atau 'chief' .

    • Dalam pelatihan multi-pekerja dengan tf.distribute.MultiWorkerMirroredStrategy , biasanya ada satu 'worker' yang mengambil tanggung jawab, seperti menyimpan pos pemeriksaan dan menulis file ringkasan untuk TensorBoard, selain yang dilakukan 'worker' biasa. 'worker' tersebut disebut sebagai pekerja kepala (dengan nama pekerjaan 'chief' ).
    • Biasanya 'chief' memiliki 'index' 0 yang ditunjuk (pada kenyataannya, ini adalah bagaimana tf.distribute.Strategy diimplementasikan).
  • Sebuah 'task' memberikan informasi tentang tugas saat ini dan berbeda untuk setiap pekerja. Ini menentukan 'type' dan 'index' pekerja itu.

Di bawah ini adalah contoh konfigurasi:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Ini adalah serial TF_CONFIG yang sama dengan string JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Perhatikan bahwa tf_config hanyalah variabel lokal di Python. Untuk dapat menggunakannya untuk konfigurasi pelatihan, dict ini perlu diserialisasikan sebagai JSON dan ditempatkan di variabel lingkungan TF_CONFIG .

Pada contoh konfigurasi di atas, Anda mengatur tugas 'type' ke 'worker' dan tugas 'index' ke 0 . Oleh karena itu, mesin ini adalah pekerja pertama . Ia akan ditunjuk sebagai pekerja 'chief' dan melakukan lebih banyak pekerjaan daripada yang lain.

Untuk tujuan ilustrasi, tutorial ini menunjukkan bagaimana Anda dapat menyiapkan variabel TF_CONFIG dengan dua pekerja di localhost .

Dalam praktiknya, Anda akan membuat beberapa pekerja pada alamat/port IP eksternal dan menetapkan variabel TF_CONFIG pada setiap pekerja yang sesuai.

Dalam tutorial ini, Anda akan menggunakan dua pekerja:

  • TF_CONFIG pekerja pertama ( 'chief' ) ditunjukkan di atas.
  • Untuk pekerja kedua, Anda akan mengatur tf_config['task']['index']=1

Variabel lingkungan dan subproses di notebook

Subproses mewarisi variabel lingkungan dari induknya.

Misalnya, Anda dapat mengatur variabel lingkungan dalam proses Notebook Jupyter ini sebagai berikut:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Kemudian, Anda dapat mengakses variabel lingkungan dari subproses:

echo ${GREETINGS}
Hello TensorFlow!

Di bagian berikutnya, Anda akan menggunakan metode serupa untuk meneruskan TF_CONFIG ke subproses pekerja. Dalam skenario dunia nyata, Anda tidak akan meluncurkan pekerjaan Anda dengan cara ini, tetapi cukup dalam contoh ini.

Pilih strategi yang tepat

Di TensorFlow, ada dua bentuk utama pelatihan terdistribusi:

  • Pelatihan sinkron , di mana langkah-langkah pelatihan disinkronkan di seluruh pekerja dan replika, dan
  • Pelatihan asinkron , di mana langkah-langkah pelatihan tidak disinkronkan secara ketat (misalnya, pelatihan server parameter ).

Tutorial ini menunjukkan cara melakukan pelatihan multi-pekerja sinkron menggunakan instance tf.distribute.MultiWorkerMirroredStrategy .

MultiWorkerMirroredStrategy membuat salinan semua variabel di lapisan model pada setiap perangkat di semua pekerja. Ini menggunakan CollectiveOps , operasi TensorFlow untuk komunikasi kolektif, untuk menggabungkan gradien dan menjaga agar variabel tetap sinkron. Panduan tf.distribute.Strategy memiliki rincian lebih lanjut tentang strategi ini.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO

MultiWorkerMirroredStrategy menyediakan beberapa implementasi melalui parameter tf.distribute.experimental.CommunicationOptions : 1) RING mengimplementasikan kolektif berbasis cincin menggunakan gRPC sebagai lapisan komunikasi lintas host; 2) NCCL menggunakan Perpustakaan Komunikasi Kolektif NVIDIA untuk mengimplementasikan kolektif; dan 3) AUTO menunda pilihan ke runtime. Pilihan terbaik untuk implementasi kolektif tergantung pada jumlah dan jenis GPU, dan interkoneksi jaringan dalam cluster.

Latih modelnya

Dengan integrasi tf.distribute.Strategy API ke tf.keras , satu-satunya perubahan yang akan Anda buat untuk mendistribusikan pelatihan ke banyak pekerja adalah melampirkan pembuatan model dan panggilan model.compile() di dalam strategy.scope() . Cakupan strategi distribusi menentukan bagaimana dan di mana variabel dibuat, dan dalam kasus MultiWorkerMirroredStrategy , variabel yang dibuat adalah MirroredVariable s, dan mereka direplikasi pada setiap pekerja.

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()

Untuk benar-benar berjalan dengan MultiWorkerMirroredStrategy Anda harus menjalankan proses pekerja dan meneruskan TF_CONFIG kepada mereka.

Seperti file mnist_setup.py yang ditulis sebelumnya, berikut adalah main.py yang akan dijalankan oleh masing-masing pekerja:

%%writefile main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.MultiWorkerMirroredStrategy()

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()


multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py

Dalam cuplikan kode di atas perhatikan bahwa global_batch_size , yang diteruskan ke Dataset.batch , disetel ke per_worker_batch_size * num_workers . Ini memastikan bahwa setiap pekerja memproses kumpulan contoh per_worker_batch_size terlepas dari jumlah pekerja.

Direktori saat ini sekarang berisi kedua file Python:

ls *.py
main.py
mnist_setup.py

Jadi json-serialize TF_CONFIG dan tambahkan ke variabel lingkungan:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Sekarang, Anda dapat meluncurkan proses pekerja yang akan menjalankan main.py dan menggunakan TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Ada beberapa hal yang perlu diperhatikan tentang perintah di atas:

  1. Ia menggunakan %%bash yang merupakan "ajaib" notebook untuk menjalankan beberapa perintah bash.
  2. Ia menggunakan flag --bg untuk menjalankan proses bash di latar belakang, karena pekerja ini tidak akan berhenti. Itu menunggu semua pekerja sebelum dimulai.

Proses pekerja di latar belakang tidak akan mencetak output ke buku catatan ini, jadi &> mengalihkan outputnya ke file sehingga Anda dapat memeriksa apa yang terjadi di file log nanti.

Jadi, tunggu beberapa detik hingga proses dimulai:

import time
time.sleep(10)

Sekarang, periksa apa yang telah dihasilkan ke file log pekerja sejauh ini:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected

Baris terakhir dari file log harus mengatakan: Started server with target: grpc://localhost:12345 . Pekerja pertama sekarang siap, dan sedang menunggu semua pekerja lain siap untuk melanjutkan.

Jadi perbarui tf_config untuk proses pekerja kedua untuk mengambil:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Luncurkan pekerja kedua. Ini akan memulai pelatihan karena semua pekerja aktif (jadi proses ini tidak perlu dilatar belakangi):

python main.py
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.

Jika Anda memeriksa ulang log yang ditulis oleh pekerja pertama, Anda akan mengetahui bahwa ia berpartisipasi dalam pelatihan model tersebut:

cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:0"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722
Epoch 2/3
70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157
Epoch 3/3
70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901

Tidak mengherankan, ini berjalan lebih lambat daripada uji coba di awal tutorial ini.

Menjalankan banyak pekerja pada satu mesin hanya menambah overhead.

Tujuannya di sini bukan untuk meningkatkan waktu pelatihan, tetapi hanya untuk memberikan contoh pelatihan multi-pekerja.

# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Pelatihan multi-pekerja secara mendalam

Sejauh ini, Anda telah mempelajari cara melakukan pengaturan multi-pekerja dasar.

Selama sisa tutorial, Anda akan belajar tentang faktor-faktor lain, yang mungkin berguna atau penting untuk kasus penggunaan nyata, secara rinci.

Pembagian kumpulan data

Dalam pelatihan multi-pekerja, sharding kumpulan data diperlukan untuk memastikan konvergensi dan kinerja.

Contoh di bagian sebelumnya bergantung pada autosharding default yang disediakan oleh tf.distribute.Strategy API. Anda dapat mengontrol sharding dengan menyetel tf.data.experimental.AutoShardPolicy dari tf.data.experimental.DistributeOptions .

Untuk mempelajari lebih lanjut tentang auto-sharding , lihat panduan input terdistribusi .

Berikut adalah contoh cepat cara mematikan sharding otomatis, sehingga setiap replika memproses setiap contoh ( tidak disarankan ):

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF

global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)

Evaluasi

Jika Anda meneruskan validation_data ke Model.fit , itu akan bergantian antara pelatihan dan evaluasi untuk setiap zaman. Evaluasi yang mengambil validation_data didistribusikan ke seluruh pekerja yang sama dan hasil evaluasi dikumpulkan dan tersedia untuk semua pekerja.

Serupa dengan pelatihan, dataset validasi secara otomatis di-sharding pada tingkat file. Anda perlu mengatur ukuran batch global dalam dataset validasi dan mengatur validation_steps .

Kumpulan data berulang juga direkomendasikan untuk evaluasi.

Atau, Anda juga dapat membuat tugas lain yang secara berkala membaca pos pemeriksaan dan menjalankan evaluasi. Inilah yang dilakukan oleh Pengukur. Tapi ini bukan cara yang disarankan untuk melakukan evaluasi dan dengan demikian rinciannya dihilangkan.

Pertunjukan

Anda sekarang memiliki model Keras yang siap dijalankan di banyak pekerja dengan MultiWorkerMirroredStrategy .

Untuk mengubah kinerja pelatihan multi-pekerja, Anda dapat mencoba yang berikut ini:

  • tf.distribute.MultiWorkerMirroredStrategy menyediakan beberapa implementasi komunikasi kolektif :

    • RING mengimplementasikan kolektif berbasis cincin menggunakan gRPC sebagai lapisan komunikasi lintas-host.
    • NCCL menggunakan Perpustakaan Komunikasi Kolektif NVIDIA untuk mengimplementasikan kolektif.
    • AUTO menunda pilihan ke runtime.

    Pilihan terbaik untuk implementasi kolektif bergantung pada jumlah GPU, jenis GPU, dan interkoneksi jaringan dalam cluster. Untuk mengganti pilihan otomatis, tentukan parameter communication_options dari konstruktor MultiWorkerMirroredStrategy . Sebagai contoh:

    communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
    
  • Keluarkan variabel ke tf.float jika memungkinkan:

    • Model ResNet resmi menyertakan contoh bagaimana hal ini dapat dilakukan.

Toleransi kesalahan

Dalam pelatihan sinkron, cluster akan gagal jika salah satu pekerja gagal dan tidak ada mekanisme pemulihan kegagalan.

Menggunakan Keras dengan tf.distribute.Strategy hadir dengan keuntungan toleransi kesalahan dalam kasus di mana pekerja mati atau tidak stabil. Anda dapat melakukan ini dengan mempertahankan status pelatihan dalam sistem file terdistribusi pilihan Anda, sehingga saat memulai ulang instans yang sebelumnya gagal atau mendahului, status pelatihan dipulihkan.

Ketika seorang pekerja menjadi tidak tersedia, pekerja lain akan gagal (mungkin setelah batas waktu). Dalam kasus seperti itu, pekerja yang tidak tersedia perlu dimulai ulang, serta pekerja lain yang gagal.

Panggilan balik ModelCheckpoint

Callback ModelCheckpoint tidak lagi menyediakan fungsionalitas toleransi kesalahan, harap gunakan callback BackupAndRestore sebagai gantinya.

Callback ModelCheckpoint masih dapat digunakan untuk menyimpan pos pemeriksaan. Tetapi dengan ini, jika pelatihan terganggu atau berhasil diselesaikan, untuk melanjutkan pelatihan dari pos pemeriksaan, pengguna bertanggung jawab untuk memuat model secara manual.

Secara opsional, pengguna dapat memilih untuk menyimpan dan memulihkan model/bobot di luar panggilan balik ModelCheckpoint .

Menyimpan dan memuat model

Untuk menyimpan model Anda menggunakan model.save atau tf.saved_model.save , tujuan penyimpanan harus berbeda untuk setiap pekerja.

  • Untuk pekerja non-kepala, Anda perlu menyimpan model ke direktori sementara.
  • Untuk kepala, Anda harus menyimpan ke direktori model yang disediakan.

Direktori sementara pada pekerja harus unik untuk mencegah kesalahan yang diakibatkan oleh banyak pekerja yang mencoba menulis ke lokasi yang sama.

Model yang disimpan di semua direktori identik, dan biasanya hanya model yang disimpan oleh kepala yang harus dirujuk untuk memulihkan atau melayani.

Anda harus memiliki beberapa logika pembersihan yang menghapus direktori sementara yang dibuat oleh pekerja setelah pelatihan Anda selesai.

Alasan untuk menghemat kepala dan pekerja pada saat yang sama adalah karena Anda mungkin menggabungkan variabel selama pos pemeriksaan yang mengharuskan kepala dan pekerja untuk berpartisipasi dalam protokol komunikasi allreduce. Di sisi lain, membiarkan kepala dan pekerja menyimpan ke direktori model yang sama akan menghasilkan kesalahan karena pertengkaran.

Menggunakan MultiWorkerMirroredStrategy , program dijalankan pada setiap pekerja, dan untuk mengetahui apakah pekerja saat ini adalah kepala, dibutuhkan objek penyelesai klaster yang memiliki atribut task_type dan task_id :

  • task_type memberitahu Anda apa pekerjaan saat ini (misalnya 'worker' ).
  • task_id memberi tahu Anda pengidentifikasi pekerja.
  • Pekerja dengan task_id == 0 ditunjuk sebagai kepala pekerja.

Dalam cuplikan kode di bawah ini, fungsi write_filepath menyediakan jalur file untuk ditulis, yang bergantung pada task_id pekerja:

  • Untuk pekerja kepala (dengan task_id == 0 ), ia menulis ke jalur file asli.
  • Untuk pekerja lain, ini membuat direktori sementara— temp_dir —dengan task_id di jalur direktori untuk menulis:
model_path = '/tmp/keras-model'

def _is_chief(task_type, task_id):
  # Note: there are two possible `TF_CONFIG` configuration.
  #   1) In addition to `worker` tasks, a `chief` task type is use;
  #      in this case, this function should be modified to
  #      `return task_type == 'chief'`.
  #   2) Only `worker` task type is used; in this case, worker 0 is
  #      regarded as the chief. The implementation demonstrated here
  #      is for this case.
  # For the purpose of this Colab section, the `task_type is None` case
  # is added because it is effectively run with only a single worker.
  return (task_type == 'worker' and task_id == 0) or task_type is None

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)

Dengan itu, Anda sekarang siap untuk menyimpan:

multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/keras-model/assets
INFO:tensorflow:Assets written to: /tmp/keras-model/assets

Seperti dijelaskan di atas, nanti model hanya boleh dimuat dari kepala jalur yang disimpan, jadi mari kita hapus yang sementara yang disimpan oleh pekerja non-kepala:

if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(os.path.dirname(write_model_path))

Sekarang, ketika saatnya memuat, mari gunakan API tf.keras.models.load_model yang nyaman, dan lanjutkan dengan pekerjaan lebih lanjut.

Di sini, asumsikan hanya menggunakan pekerja tunggal untuk memuat dan melanjutkan pelatihan, dalam hal ini Anda tidak memanggil tf.keras.models.load_model dalam strategy.scope() lain (perhatikan bahwa strategy = tf.distribute.MultiWorkerMirroredStrategy() , seperti yang didefinisikan sebelumnya ):

loaded_model = tf.keras.models.load_model(model_path)

# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2
20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773
<keras.callbacks.History at 0x7f6669989750>

Menyimpan dan memulihkan pos pemeriksaan

Di sisi lain, checkpointing memungkinkan Anda untuk menyimpan bobot model Anda dan memulihkannya tanpa harus menyimpan seluruh model.

Di sini, Anda akan membuat satu tf.train.Checkpoint yang melacak model, yang dikelola oleh tf.train.CheckpointManager , sehingga hanya pos pemeriksaan terbaru yang dipertahankan:

checkpoint_dir = '/tmp/ckpt'

checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Setelah CheckpointManager diatur, Anda sekarang siap untuk menyimpan dan menghapus pos pemeriksaan yang telah disimpan oleh pekerja non-kepala:

checkpoint_manager.save()
if not _is_chief(task_type, task_id):
  tf.io.gfile.rmtree(write_checkpoint_dir)

Sekarang, ketika Anda perlu memulihkan model, Anda dapat menemukan pos pemeriksaan terbaru yang disimpan menggunakan fungsi tf.train.latest_checkpoint yang nyaman. Setelah memulihkan pos pemeriksaan, Anda dapat melanjutkan dengan pelatihan.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/2
2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547
Epoch 2/2
20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938
<keras.callbacks.History at 0x7f6669589850>

Cadangkan dan Pulihkan panggilan balik

Callback tf.keras.callbacks.BackupAndRestore menyediakan fungsionalitas toleransi kesalahan dengan mencadangkan model dan nomor epoch saat ini dalam file pos pemeriksaan sementara di bawah argumen backup_dir ke BackupAndRestore . Ini dilakukan di akhir setiap epoch.

Setelah pekerjaan terputus dan dimulai ulang, panggilan balik mengembalikan pos pemeriksaan terakhir, dan pelatihan berlanjut dari awal zaman yang terputus. Setiap pelatihan parsial yang sudah dilakukan di epoch yang belum selesai sebelum interupsi akan dibuang, sehingga tidak mempengaruhi status model akhir.

Untuk menggunakannya, berikan instance tf.keras.callbacks.BackupAndRestore pada panggilan Model.fit .

Dengan MultiWorkerMirroredStrategy , jika seorang pekerja terganggu, seluruh klaster akan dijeda hingga pekerja yang terganggu dimulai ulang. Pekerja lain juga akan memulai ulang, dan pekerja yang terputus bergabung kembali dengan cluster. Kemudian, setiap pekerja membaca file pos pemeriksaan yang sebelumnya disimpan dan mengambil status sebelumnya, sehingga memungkinkan cluster untuk kembali sinkron. Kemudian, pelatihan berlanjut.

Callback BackupAndRestore menggunakan CheckpointManager untuk menyimpan dan memulihkan status pelatihan, yang menghasilkan file bernama checkpoint yang melacak checkpoint yang ada bersama dengan yang terbaru. Untuk alasan ini, backup_dir tidak boleh digunakan kembali untuk menyimpan pos pemeriksaan lain untuk menghindari tabrakan nama.

Saat ini, callback BackupAndRestore mendukung pelatihan pekerja tunggal tanpa strategi— MirroredStrategy —dan pelatihan multi-pekerja dengan MultiWorkerMirroredStrategy .

Di bawah ini adalah dua contoh untuk pelatihan multi-pekerja dan pelatihan pekerja tunggal:

# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.

callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
  multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
                       epochs=3,
                       steps_per_epoch=70,
                       callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_INT64
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 60000
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:5"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 28
        }
        dim {
          size: 28
        }
      }
      shape {
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}
Epoch 1/3
70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123
Epoch 2/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509
Epoch 3/3
70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614
<keras.callbacks.History at 0x7f6669555d90>

Jika Anda memeriksa direktori backup_dir yang Anda tentukan di BackupAndRestore , Anda mungkin melihat beberapa file pos pemeriksaan yang dibuat sementara. File tersebut diperlukan untuk memulihkan instance yang sebelumnya hilang, dan file tersebut akan dihapus oleh library di akhir Model.fit setelah berhasil keluar dari pelatihan Anda.

Sumber daya tambahan

  1. Pelatihan Terdistribusi dalam panduan TensorFlow memberikan gambaran umum tentang strategi distribusi yang tersedia.
  2. Loop pelatihan Kustom dengan Keras dan tutorial MultiWorkerMirroredStrategy menunjukkan cara menggunakan MultiWorkerMirroredStrategy dengan Keras dan loop pelatihan kustom.
  3. Lihat model resmi , banyak di antaranya dapat dikonfigurasi untuk menjalankan beberapa strategi distribusi.
  4. Panduan kinerja yang lebih baik dengan tf.function memberikan informasi tentang strategi dan alat lain, seperti TensorFlow Profiler yang dapat Anda gunakan untuk mengoptimalkan kinerja model TensorFlow Anda.