Pelatihan server parameter dengan ParameterServerStrategy

Lihat di TensorFlow.org Jalankan di Google Colab Lihat sumber di GitHub Unduh buku catatan

Ringkasan

Pelatihan server parameter adalah metode paralel data umum untuk meningkatkan pelatihan model pada beberapa mesin.

Sebuah cluster pelatihan server parameter terdiri dari pekerja dan server parameter . Variabel dibuat di server parameter dan dibaca serta diperbarui oleh pekerja di setiap langkah. Secara default, pekerja membaca dan memperbarui variabel ini secara independen tanpa menyinkronkan satu sama lain. Inilah sebabnya mengapa terkadang pelatihan gaya server parameter disebut pelatihan asinkron .

Di TensorFlow 2, pelatihan server parameter didukung oleh kelas tf.distribute.experimental.ParameterServerStrategy , yang mendistribusikan langkah-langkah pelatihan ke cluster yang menskalakan hingga ribuan pekerja (disertai dengan server parameter).

Metode pelatihan yang didukung

Ada dua metode pelatihan utama yang didukung:

Sebuah cluster dengan pekerjaan dan tugas

Terlepas dari API pilihan ( Model.fit atau loop pelatihan khusus), pelatihan terdistribusi di TensorFlow 2 melibatkan: 'cluster' dengan beberapa 'jobs' , dan setiap tugas mungkin memiliki satu atau beberapa 'tasks' .

Saat menggunakan pelatihan server parameter, disarankan untuk memiliki:

  • Satu pekerjaan koordinator (yang memiliki nama pekerjaan chief )
  • Beberapa pekerjaan pekerja (pekerjaan nama worker ); dan
  • Beberapa pekerjaan server parameter (nama pekerjaan ps )

Sementara koordinator membuat sumber daya, mengirimkan tugas pelatihan, menulis pos pemeriksaan, dan menangani kegagalan tugas, pekerja dan server parameter menjalankan tf.distribute.Server yang mendengarkan permintaan dari koordinator.

Pelatihan server parameter dengan Model.fit API

Pelatihan server parameter dengan Model.fit API memerlukan koordinator untuk menggunakan objek tf.distribute.experimental.ParameterServerStrategy , dan tf.keras.utils.experimental.DatasetCreator sebagai input. Mirip dengan penggunaan Model.fit tanpa strategi, atau dengan strategi lain, alur kerja melibatkan pembuatan dan kompilasi model, menyiapkan callback, diikuti dengan panggilan Model.fit .

Pelatihan server parameter dengan loop pelatihan khusus

Dengan loop pelatihan khusus, kelas tf.distribute.experimental.coordinator.ClusterCoordinator adalah komponen kunci yang digunakan untuk koordinator.

API terpenting yang disediakan oleh objek ClusterCoordinator adalah schedule :

  • API schedule mengantrekan tf.function dan segera mengembalikan RemoteValue seperti masa depan.
  • Fungsi antrian akan dikirim ke pekerja jarak jauh di utas latar belakang dan RemoteValue s mereka akan diisi secara asinkron.
  • Karena schedule tidak memerlukan penugasan pekerja, fungsi tf.function diteruskan dapat dijalankan pada pekerja mana pun yang tersedia.
  • Jika pekerja yang dieksekusi menjadi tidak tersedia sebelum selesai, fungsi tersebut akan dicoba kembali pada pekerja lain yang tersedia.
  • Karena fakta ini dan fakta bahwa eksekusi fungsi tidak atomik, suatu fungsi dapat dieksekusi lebih dari satu kali.

Selain mengirimkan fungsi jarak jauh, ClusterCoordinator juga membantu membuat kumpulan data pada semua pekerja dan membangun kembali kumpulan data ini saat pekerja pulih dari kegagalan.

Pengaturan tutorial

Tutorial akan bercabang menjadi Model.fit dan jalur loop pelatihan khusus, dan Anda dapat memilih salah satu yang sesuai dengan kebutuhan Anda. Bagian selain "Pelatihan dengan X" berlaku untuk kedua jalur.

pip install portpicker

Pengaturan kluster

Seperti disebutkan di atas, kluster pelatihan server parameter memerlukan tugas koordinator yang menjalankan program pelatihan Anda, satu atau beberapa pekerja dan tugas server parameter yang menjalankan server tf.distribute.Server —dan mungkin tugas evaluasi tambahan yang menjalankan evaluasi side-car (lihat bagian evaluasi mobil samping di bawah). Persyaratan untuk mengaturnya adalah:

  • Tugas koordinator perlu mengetahui alamat dan port semua server TensorFlow lainnya kecuali evaluator.
  • Pekerja dan server parameter perlu mengetahui port mana yang perlu mereka dengarkan. Demi kesederhanaan, Anda biasanya dapat meneruskan informasi cluster lengkap saat membuat server TensorFlow pada tugas ini.
  • Tugas evaluator tidak harus mengetahui penyiapan kluster pelatihan. Jika ya, seharusnya tidak mencoba menyambung ke kluster pelatihan.
  • Pekerja dan server parameter harus memiliki jenis tugas masing-masing sebagai "worker" dan "ps" . Koordinator harus menggunakan "chief" sebagai jenis tugas untuk alasan warisan.

Dalam tutorial ini, Anda akan membuat cluster dalam proses sehingga pelatihan seluruh parameter server dapat dijalankan di Colab. Anda akan mempelajari cara menyiapkan kluster nyata di bagian selanjutnya.

Cluster dalam proses

Anda akan mulai dengan membuat beberapa server TensorFlow terlebih dahulu dan menghubungkannya nanti. Perhatikan bahwa ini hanya untuk tujuan demonstrasi tutorial ini, dan dalam pelatihan nyata server akan dimulai pada mesin "worker" dan "ps" .

def create_in_process_cluster(num_workers, num_ps):
  """Creates and starts local servers and returns the cluster_resolver."""
  worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
  ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]

  cluster_dict = {}
  cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
  if num_ps > 0:
    cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]

  cluster_spec = tf.train.ClusterSpec(cluster_dict)

  # Workers need some inter_ops threads to work properly.
  worker_config = tf.compat.v1.ConfigProto()
  if multiprocessing.cpu_count() < num_workers + 1:
    worker_config.inter_op_parallelism_threads = num_workers + 1

  for i in range(num_workers):
    tf.distribute.Server(
        cluster_spec,
        job_name="worker",
        task_index=i,
        config=worker_config,
        protocol="grpc")

  for i in range(num_ps):
    tf.distribute.Server(
        cluster_spec,
        job_name="ps",
        task_index=i,
        protocol="grpc")

  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, rpc_layer="grpc")
  return cluster_resolver

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)

Pengaturan cluster dalam proses sering digunakan dalam pengujian unit, seperti di sini .

Pilihan lain untuk pengujian lokal adalah meluncurkan proses pada mesin lokal—lihat Pelatihan multi-pekerja dengan Keras untuk contoh pendekatan ini.

Buat Instansi ParameterServerStrategy

Sebelum Anda mendalami kode pelatihan, mari kita buat instance objek ParameterServerStrategy . Perhatikan bahwa ini diperlukan terlepas dari apakah Anda melanjutkan dengan Model.fit atau loop pelatihan khusus. Argumen variable_partitioner akan dijelaskan di bagian Variable sharding .

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.experimental.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

Untuk menggunakan GPU untuk pelatihan, alokasikan GPU yang terlihat oleh setiap pekerja. ParameterServerStrategy akan menggunakan semua GPU yang tersedia pada setiap pekerja, dengan batasan bahwa semua pekerja harus memiliki jumlah GPU yang sama.

Pecahan variabel

Pecahan variabel mengacu pada pemisahan variabel menjadi beberapa variabel yang lebih kecil, yang disebut pecahan . Pecahan variabel mungkin berguna untuk mendistribusikan beban jaringan saat mengakses pecahan ini. Hal ini juga berguna untuk mendistribusikan komputasi dan penyimpanan variabel normal di beberapa server parameter.

Untuk mengaktifkan sharding variabel, Anda dapat memasukkan variable_partitioner saat membuat objek ParameterServerStrategy . variable_partitioner akan dipanggil setiap kali variabel dibuat dan diharapkan untuk mengembalikan jumlah pecahan di sepanjang setiap dimensi variabel. Beberapa variable_partitioner s out-of-box disediakan seperti tf.distribute.experimental.partitioners.MinSizePartitioner . Disarankan untuk menggunakan partisi berbasis ukuran seperti tf.distribute.experimental.partitioners.MinSizePartitioner untuk menghindari mempartisi variabel kecil, yang dapat berdampak negatif pada kecepatan pelatihan model.

Ketika variable_partitioner diteruskan dan jika Anda membuat variabel langsung di bawah strategy.scope() , itu akan menjadi tipe penampung dengan properti variables yang menyediakan akses ke daftar pecahan. Dalam kebanyakan kasus, penampung ini akan secara otomatis dikonversi ke Tensor dengan menggabungkan semua pecahan. Akibatnya, dapat digunakan sebagai variabel normal. Di sisi lain, beberapa metode TensorFlow seperti tf.nn.embedding_lookup menyediakan implementasi yang efisien untuk jenis penampung ini dan dalam metode ini penggabungan otomatis akan dihindari.

Silakan lihat dokumen API tf.distribute.experimental.ParameterServerStrategy untuk detail selengkapnya.

Pelatihan dengan Model.fit

Keras menyediakan API pelatihan yang mudah digunakan melalui Model.fit yang menangani loop pelatihan di bawah tenda, dengan fleksibilitas train_step yang dapat diganti, dan callback, yang menyediakan fungsionalitas seperti penyimpanan pos pemeriksaan atau penyimpanan ringkasan untuk TensorBoard. Dengan Model.fit , kode pelatihan yang sama dapat digunakan untuk strategi lain dengan pertukaran sederhana dari objek strategi.

Memasukan data

Model.fit dengan pelatihan server parameter mengharuskan data input disediakan dalam callable yang mengambil argumen tunggal bertipe tf.distribute.InputContext , dan mengembalikan tf.data.Dataset . Kemudian, buat objek tf.keras.utils.experimental.DatasetCreator yang menggunakan callable tersebut, dan objek tf.distribute.InputOptions opsional melalui argumen input_options .

Perhatikan bahwa dianjurkan untuk mengacak dan mengulang data dengan pelatihan parameter server, dan menentukan steps_per_epoch dalam panggilan yang fit sehingga perpustakaan mengetahui batas-batas zaman.

Silakan lihat tutorial input Terdistribusi untuk informasi lebih lanjut tentang argumen InputContext .

def dataset_fn(input_context):
  global_batch_size = 64
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)

  x = tf.random.uniform((10, 10))
  y = tf.random.uniform((10,))

  dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
  dataset = dataset.shard(
      input_context.num_input_pipelines,
      input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2)

  return dataset

dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)

Kode di dataset_fn akan dipanggil pada perangkat input, yang biasanya CPU, pada setiap mesin pekerja.

Konstruksi dan kompilasi model

Sekarang, Anda akan membuat tf.keras.Model tf.keras.models.Sequential sepele untuk tujuan demonstrasi—diikuti oleh panggilan Model.compile untuk memasukkan komponen, seperti pengoptimal, metrik, atau parameter seperti steps_per_execution :

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

  model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)

Panggilan balik dan pelatihan

Sebelum Anda memanggil model.fit untuk pelatihan yang sebenarnya, mari siapkan panggilan balik yang diperlukan untuk tugas-tugas umum, seperti:

  • ModelCheckpoint : untuk menyimpan bobot model.
  • BackupAndRestore : untuk memastikan progres pelatihan dicadangkan secara otomatis, dan dipulihkan jika cluster mengalami unavailability (seperti abort atau preemption); atau
  • TensorBoard : untuk menyimpan laporan kemajuan ke dalam file ringkasan, yang divisualisasikan di alat TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')

callbacks = [
    tf.keras.callbacks.TensorBoard(log_dir=log_dir),
    tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
    tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]

model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step
Epoch 2/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step
Epoch 3/5
WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for  more details.
20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step
Epoch 4/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step
Epoch 5/5
INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets
20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step
<keras.callbacks.History at 0x7f89984ca890>

Penggunaan langsung dengan ClusterCoordinator (opsional)

Bahkan jika Anda memilih jalur pelatihan Model.fit , Anda dapat secara opsional membuat instance objek tf.distribute.experimental.coordinator.ClusterCoordinator untuk menjadwalkan fungsi lain yang ingin Anda jalankan pada pekerja. Lihat bagian Pelatihan dengan loop pelatihan khusus untuk detail dan contoh selengkapnya.

Pelatihan dengan loop pelatihan khusus

Menggunakan loop pelatihan khusus dengan tf.distribute.Strategy memberikan fleksibilitas tinggi untuk menentukan loop pelatihan. Dengan ParameterServerStrategy yang didefinisikan di atas (sebagai strategy ), Anda akan menggunakan tf.distribute.experimental.coordinator.ClusterCoordinator untuk mengirimkan eksekusi langkah-langkah pelatihan ke pekerja jarak jauh.

Kemudian, Anda akan membuat model, mendefinisikan kumpulan data dan fungsi langkah, seperti yang telah Anda lakukan di loop pelatihan dengan tf.distribute.Strategy s lainnya. Anda dapat menemukan detail lebih lanjut di pelatihan Kustom dengan tutorial tf.distribute.Strategy .

Untuk memastikan pengambilan awal set data yang efisien, gunakan API pembuatan set data terdistribusi yang direkomendasikan yang disebutkan di bagian Langkah pelatihan pengiriman ke pekerja jarak jauh di bawah ini. Selain itu, pastikan untuk memanggil Strategy.run di dalam worker_fn untuk memanfaatkan sepenuhnya GPU yang dialokasikan untuk pekerja. Langkah-langkah selanjutnya sama untuk pelatihan dengan atau tanpa GPU.

Mari kita buat komponen-komponen ini dalam langkah-langkah berikut:

Siapkan datanya

Pertama, tulis fungsi yang membuat kumpulan data yang menyertakan logika prapemrosesan yang diimplementasikan oleh lapisan prapemrosesan Keras .

Anda akan membuat lapisan ini di luar dataset_fn tetapi menerapkan transformasi di dalam dataset_fn , karena Anda akan membungkus dataset_fn menjadi tf.function , yang tidak mengizinkan variabel dibuat di dalamnya.

feature_vocab = [
    "avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]

with strategy.scope():
  feature_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=feature_vocab,
      mask_token=None)
  label_lookup_layer = tf.keras.layers.StringLookup(
      vocabulary=label_vocab,
      num_oov_indices=0,
      mask_token=None)

  raw_feature_input = tf.keras.layers.Input(
      shape=(3,),
      dtype=tf.string,
      name="feature")
  feature_id_input = feature_lookup_layer(raw_feature_input)
  feature_preprocess_stage = tf.keras.Model(
      {"features": raw_feature_input},
      feature_id_input)

  raw_label_input = tf.keras.layers.Input(
      shape=(1,),
      dtype=tf.string,
      name="label")
  label_id_input = label_lookup_layer(raw_label_input)

  label_preprocess_stage = tf.keras.Model(
      {"label": raw_label_input},
      label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  return bool(asarray(a1 == a2).all())

Buat contoh mainan dalam kumpulan data:

def feature_and_label_gen(num_examples=200):
  examples = {"features": [], "label": []}
  for _ in range(num_examples):
    features = random.sample(feature_vocab, 3)
    label = ["yes"] if "avenger" in features else ["no"]
    examples["features"].append(features)
    examples["label"].append(label)
  return examples

examples = feature_and_label_gen()

Kemudian, buat set data pelatihan yang dibungkus dengan dataset_fn :

def dataset_fn(_):
  raw_dataset = tf.data.Dataset.from_tensor_slices(examples)

  train_dataset = raw_dataset.map(
      lambda x: (
          {"features": feature_preprocess_stage(x["features"])},
          label_preprocess_stage(x["label"])
      )).shuffle(200).batch(32).repeat()
  return train_dataset

Bangun modelnya

Selanjutnya, buat model dan objek lainnya. Pastikan untuk membuat semua variabel di bawah strategy.scope .

# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
  # Create the model. The input needs to be compatible with Keras processing layers.
  model_input = tf.keras.layers.Input(
      shape=(3,), dtype=tf.int64, name="model_input")

  emb_layer = tf.keras.layers.Embedding(
      input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
  emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
  dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
  model = tf.keras.Model({"features": model_input}, dense_output)

  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
  accuracy = tf.keras.metrics.Accuracy()

Mari kita konfirmasikan bahwa penggunaan FixedShardsPartitioner membagi semua variabel menjadi dua pecahan dan setiap pecahan ditugaskan ke server parameter yang berbeda:

assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"

Tentukan langkah pelatihan

Ketiga, buat langkah pelatihan yang dibungkus dengan tf.function :

@tf.function
def step_fn(iterator):

  def replica_fn(batch_data, labels):
    with tf.GradientTape() as tape:
      pred = model(batch_data, training=True)
      per_example_loss = tf.keras.losses.BinaryCrossentropy(
              reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
      loss = tf.nn.compute_average_loss(per_example_loss)
      gradients = tape.gradient(loss, model.trainable_variables)

    optimizer.apply_gradients(zip(gradients, model.trainable_variables))

    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    accuracy.update_state(labels, actual_pred)
    return loss

  batch_data, labels = next(iterator)
  losses = strategy.run(replica_fn, args=(batch_data, labels))
  return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)

Dalam fungsi langkah pelatihan di atas, memanggil Strategy.run dan Strategy.reduce di step_fn dapat mendukung beberapa GPU per pekerja. Jika para pekerja memiliki GPU yang dialokasikan, Strategy.run akan mendistribusikan kumpulan data pada beberapa replika.

Mengirimkan langkah-langkah pelatihan ke pekerja jarak jauh

Setelah semua perhitungan ditentukan oleh ParameterServerStrategy , Anda akan menggunakan kelas tf.distribute.experimental.coordinator.ClusterCoordinator untuk membuat sumber daya dan mendistribusikan langkah-langkah pelatihan ke pekerja jarak jauh.

Pertama-tama mari kita buat objek ClusterCoordinator dan berikan objek strategi:

coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)

Kemudian, buat set data per pekerja dan iterator. Dalam per_worker_dataset_fn di bawah ini, disarankan untuk dataset_fn ke dalam strategy.distribute_datasets_from_function untuk memungkinkan pra-pengambilan yang efisien ke GPU dengan mulus.

@tf.function
def per_worker_dataset_fn():
  return strategy.distribute_datasets_from_function(dataset_fn)

per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).

Langkah terakhir adalah mendistribusikan perhitungan ke pekerja jarak jauh menggunakan ClusterCoordinator.schedule :

  • Metode schedule mengantrekan tf.function dan segera mengembalikan RemoteValue seperti masa depan. Fungsi yang diantrekan akan dikirim ke pekerja jarak jauh di utas latar belakang dan RemoteValue akan diisi secara asinkron.
  • Metode join ( ClusterCoordinator.join ) dapat digunakan untuk menunggu sampai semua fungsi terjadwal dieksekusi.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
  accuracy.reset_states()
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  # Wait at epoch boundaries.
  coordinator.join()
  print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
Finished epoch 0, accuracy is 0.543750.
Finished epoch 1, accuracy is 0.543750.
Finished epoch 2, accuracy is 0.950000.
Finished epoch 3, accuracy is 1.000000.

Inilah cara Anda mengambil hasil dari RemoteValue :

loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000

Atau, Anda dapat meluncurkan semua langkah dan melakukan sesuatu sambil menunggu penyelesaian:

for _ in range(total_steps):
  coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
  time.sleep(10)
  # Do something like logging metrics or writing checkpoints.

Untuk pelatihan lengkap dan alur kerja penyajian untuk contoh khusus ini, silakan lihat pengujian ini .

Selengkapnya tentang pembuatan kumpulan data

Dataset dalam kode di atas dibuat menggunakan ClusterCoordinator.create_per_worker_dataset API). Ini membuat satu set data per pekerja dan mengembalikan objek kontainer. Anda dapat memanggil metode iter untuk membuat iterator per pekerja. Iterator per pekerja berisi satu iterator per pekerja dan potongan pekerja yang sesuai akan diganti dalam argumen input dari fungsi yang diteruskan ke metode ClusterCoordinator.schedule sebelum fungsi dijalankan pada pekerja tertentu.

Saat ini, metode ClusterCoordinator.schedule mengasumsikan pekerja setara dan dengan demikian mengasumsikan kumpulan data pada pekerja yang berbeda adalah sama kecuali mereka mungkin diacak secara berbeda jika berisi operasi Dataset.shuffle . Karena itu, juga disarankan agar kumpulan data diulang tanpa batas waktu dan Anda menjadwalkan sejumlah langkah yang terbatas daripada mengandalkan OutOfRangeError dari kumpulan data.

Catatan penting lainnya adalah bahwa kumpulan data tf.data tidak mendukung serialisasi dan deserialisasi implisit melintasi batas tugas. Jadi, penting untuk membuat seluruh dataset di dalam fungsi yang diteruskan ke ClusterCoordinator.create_per_worker_dataset .

Evaluasi

Ada lebih dari satu cara untuk mendefinisikan dan menjalankan lingkaran evaluasi dalam pelatihan terdistribusi. Masing-masing memiliki pro dan kontra sendiri seperti yang dijelaskan di bawah ini. Metode evaluasi sebaris disarankan jika Anda tidak memiliki preferensi.

Evaluasi sebaris

Dalam metode ini, koordinator bergantian antara pelatihan dan evaluasi dan oleh karena itu disebut evaluasi sebaris .

Ada beberapa manfaat dari evaluasi inline. Sebagai contoh:

  • Ini dapat mendukung model evaluasi besar dan kumpulan data evaluasi yang tidak dapat dimiliki oleh satu tugas.
  • Hasil evaluasi dapat digunakan untuk mengambil keputusan untuk pelatihan epoch selanjutnya.

Ada dua cara untuk menerapkan evaluasi inline: evaluasi langsung dan evaluasi terdistribusi.

  • Evaluasi langsung : Untuk model kecil dan dataset evaluasi, koordinator dapat menjalankan evaluasi secara langsung pada model terdistribusi dengan dataset evaluasi pada koordinator:
eval_dataset = tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).batch(8)

eval_accuracy = tf.keras.metrics.Accuracy()

for batch_data, labels in eval_dataset:
  pred = model(batch_data, training=False)
  actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
  eval_accuracy.update_state(labels, actual_pred)

print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Evaluation accuracy: 1.000000
  • Evaluasi terdistribusi : Untuk model besar atau kumpulan data yang tidak layak dijalankan langsung pada koordinator, tugas koordinator dapat mendistribusikan tugas evaluasi kepada pekerja melalui metode ClusterCoordinator.schedule / ClusterCoordinator.join :
with strategy.scope():
  # Define the eval metric on parameter servers.
  eval_accuracy = tf.keras.metrics.Accuracy()

@tf.function
def eval_step(iterator):
  def replica_fn(batch_data, labels):
    pred = model(batch_data, training=False)
    actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
    eval_accuracy.update_state(labels, actual_pred)
  batch_data, labels = next(iterator)
  strategy.run(replica_fn, args=(batch_data, labels))

def eval_dataset_fn():
  return tf.data.Dataset.from_tensor_slices(
      feature_and_label_gen(num_examples=16)).map(
          lambda x: (
              {"features": feature_preprocess_stage(x["features"])},
              label_preprocess_stage(x["label"])
          )).shuffle(16).repeat().batch(8)

per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)

eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
  coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources
Evaluation accuracy: 1.000000

Evaluasi mobil samping

Metode lain disebut evaluasi mobil samping di mana Anda membuat tugas evaluator khusus yang berulang kali membaca pos pemeriksaan dan menjalankan evaluasi pada pos pemeriksaan terbaru. Ini memungkinkan program pelatihan Anda selesai lebih awal jika Anda tidak perlu mengubah lingkaran pelatihan berdasarkan hasil evaluasi. Namun, ini membutuhkan tugas evaluator tambahan dan pos pemeriksaan berkala untuk memicu evaluasi. Berikut ini adalah kemungkinan putaran evaluasi mobil samping:

checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)

for latest_checkpoint in tf.train.checkpoints_iterator(
    checkpoint_dir):
  try:
    checkpoint.restore(latest_checkpoint).expect_partial()
  except (tf.errors.OpError,) as e:
    # checkpoint may be deleted by training when it is about to read it.
    continue

  # Optionally add callbacks to write summaries.
  eval_model.evaluate(eval_data)

  # Evaluation finishes when it has evaluated the last epoch.
  if latest_checkpoint.endswith('-{}'.format(train_epoches)):
    break

Cluster di dunia nyata

Dalam lingkungan produksi nyata, Anda akan menjalankan semua tugas dalam proses yang berbeda pada mesin yang berbeda. Cara paling sederhana untuk mengonfigurasi informasi cluster pada setiap tugas adalah dengan menyetel variabel lingkungan "TF_CONFIG" dan menggunakan tf.distribute.cluster_resolver.TFConfigClusterResolver untuk mengurai "TF_CONFIG" .

Untuk deskripsi umum tentang variabel lingkungan "TF_CONFIG" , lihat panduan pelatihan Terdistribusi .

Jika Anda memulai tugas pelatihan menggunakan Kubernetes atau template konfigurasi lainnya, kemungkinan besar template ini telah menyetel “TF_CONFIG" untuk Anda.

Setel variabel lingkungan "TF_CONFIG"

Misalkan Anda memiliki 3 pekerja dan 2 server parameter, "TF_CONFIG" pekerja 1 dapat berupa:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

"TF_CONFIG" dari evaluator dapat berupa:

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "evaluator": ["host7:port"]
    },
    "task": {"type": "evaluator", "index": 0}
})

Bagian "cluster" dalam string "TF_CONFIG" di atas untuk evaluator adalah opsional.

Jika Anda menggunakan biner yang sama untuk semua tugas

Jika Anda lebih suka menjalankan semua tugas ini menggunakan biner tunggal, Anda harus membiarkan program Anda bercabang ke peran yang berbeda di awal:

cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
  # Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
  # Run side-car evaluation
else:
  # Run the coordinator.

Kode berikut memulai server TensorFlow dan menunggu:

# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"

server = tf.distribute.Server(
    cluster_resolver.cluster_spec(),
    job_name=cluster_resolver.task_type,
    task_index=cluster_resolver.task_id,
    protocol=cluster_resolver.rpc_layer or "grpc",
    start=True)
server.join()

Menangani kegagalan tugas

Kegagalan pekerja

tf.distribute.experimental.coordinator.ClusterCoordinator atau Model.fit menyediakan toleransi kesalahan bawaan untuk kegagalan pekerja. Setelah pemulihan pekerja, fungsi kumpulan data yang disediakan sebelumnya (baik ke ClusterCoordinator.create_per_worker_dataset untuk loop pelatihan khusus, atau tf.keras.utils.experimental.DatasetCreator untuk Model.fit ) akan dipanggil pada pekerja untuk membuat kembali kumpulan data.

Server parameter atau kegagalan koordinator

Namun, ketika koordinator melihat kesalahan server parameter, itu akan segera memunculkan UnavailableError atau AbortedError . Anda dapat memulai ulang koordinator dalam kasus ini. Koordinator itu sendiri juga bisa menjadi tidak tersedia. Oleh karena itu, perkakas tertentu direkomendasikan agar tidak kehilangan kemajuan pelatihan:

  • Untuk Model.fit , Anda harus menggunakan callback BackupAndRestore , yang menangani penyimpanan dan pemulihan progres secara otomatis. Lihat bagian Callback dan pelatihan di atas untuk contoh.

  • Untuk loop pelatihan khusus, Anda harus memeriksa variabel model secara berkala dan memuat variabel model dari pos pemeriksaan, jika ada, sebelum pelatihan dimulai. Kemajuan pelatihan dapat disimpulkan kira-kira dari optimizer.iterations . iterasi jika pengoptimal diperiksa:

checkpoint_manager = tf.train.CheckpointManager(
    tf.train.Checkpoint(model=model, optimizer=optimizer),
    checkpoint_dir,
    max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
  checkpoint = checkpoint_manager.checkpoint
  checkpoint.restore(
      checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()

global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch

for _ in range(starting_epoch, num_epoches):
  for _ in range(steps_per_epoch):
    coordinator.schedule(step_fn, args=(per_worker_iterator,))
  coordinator.join()
  checkpoint_manager.save()

Mengambil RemoteValue

Mengambil RemoteValue dijamin berhasil jika suatu fungsi berhasil dijalankan. Ini karena saat ini nilai kembalian segera disalin ke koordinator setelah suatu fungsi dijalankan. Jika ada kegagalan pekerja selama penyalinan, fungsi tersebut akan dicoba kembali pada pekerja lain yang tersedia. Oleh karena itu, jika Anda ingin mengoptimalkan kinerja, Anda dapat menjadwalkan fungsi tanpa nilai balik.

Pelaporan kesalahan

Setelah koordinator melihat kesalahan seperti UnavailableError dari server parameter atau kesalahan aplikasi lain seperti InvalidArgument dari tf.debugging.check_numerics , itu akan membatalkan semua fungsi tertunda dan antri sebelum meningkatkan kesalahan. Mengambil RemoteValue s yang sesuai akan memunculkan CancelledError .

Setelah kesalahan dimunculkan, koordinator tidak akan memunculkan kesalahan yang sama atau kesalahan apa pun dari fungsi yang dibatalkan.

Peningkatan performa

Ada beberapa kemungkinan alasan jika Anda melihat masalah kinerja saat Anda berlatih dengan ParameterServerStrategy dan ClusterResolver .

Salah satu alasan umum adalah server parameter memiliki beban yang tidak seimbang dan beberapa server parameter dengan beban berat telah mencapai kapasitas. Ada juga beberapa akar penyebab. Beberapa metode sederhana untuk mengurangi masalah ini adalah dengan:

  1. Pisahkan variabel model besar Anda dengan menentukan variable_partitioner saat membuat ParameterServerStrategy .
  2. Hindari membuat variabel hotspot yang diperlukan oleh semua server parameter dalam satu langkah jika memungkinkan. Misalnya, gunakan tingkat pembelajaran konstan atau subkelas tf.keras.optimizers.schedules.LearningRateSchedule di pengoptimal karena perilaku default adalah bahwa tingkat pembelajaran akan menjadi variabel yang ditempatkan pada server parameter tertentu dan diminta oleh semua server parameter lain di setiap langkah .
  3. Acak kosakata besar Anda sebelum meneruskannya ke lapisan prapemrosesan Keras.

Alasan lain yang mungkin untuk masalah kinerja adalah koordinator. Implementasi schedule / join pertama Anda berbasis Python dan karenanya mungkin memiliki overhead threading. Latensi antara koordinator dan pekerja juga bisa besar. Jika ini masalahnya,

  • Untuk Model.fit , Anda dapat mengatur argumen steps_per_execution yang disediakan di Model.compile ke nilai yang lebih besar dari 1.

  • Untuk loop pelatihan khusus, Anda dapat mengemas beberapa langkah ke dalam satu tf.function :

steps_per_invocation = 10

@tf.function
def step_fn(iterator):
  for _ in range(steps_per_invocation):
    features, labels = next(iterator)
    def replica_fn(features, labels):
      ...

    strategy.run(replica_fn, args=(features, labels))

Karena perpustakaan dioptimalkan lebih lanjut, semoga sebagian besar pengguna tidak perlu mengemas langkah-langkah secara manual di masa mendatang.

Selain itu, trik kecil untuk peningkatan kinerja adalah menjadwalkan fungsi tanpa nilai kembalian seperti yang dijelaskan di bagian kegagalan tugas penanganan di atas.

Batasan yang diketahui

Sebagian besar batasan yang diketahui sudah tercakup dalam bagian di atas. Bagian ini memberikan ringkasan.

ParameterServerStrategy umum

  • os.environment["grpc_fail_fast"]="use_caller" diperlukan pada setiap tugas termasuk koordinator, untuk membuat toleransi kesalahan bekerja dengan baik.
  • Pelatihan server parameter sinkron tidak didukung.
  • Biasanya diperlukan untuk mengemas beberapa langkah ke dalam satu fungsi untuk mencapai kinerja yang optimal.
  • Tidak didukung untuk memuat save_model melalui tf.saved_model.load yang berisi variabel sharded. Catatan memuat save_model seperti itu menggunakan TensorFlow Serving diharapkan berfungsi.
  • Tidak didukung untuk memuat pos pemeriksaan yang berisi variabel slot pengoptimal pecahan ke dalam jumlah pecahan yang berbeda.
  • Tidak didukung untuk memulihkan dari kegagalan server parameter tanpa memulai ulang tugas koordinator.
  • Penggunaan tf.lookup.StaticHashTable (yang biasanya digunakan oleh beberapa lapisan prapemrosesan Keras, seperti tf.keras.layers.IntegerLookup , tf.keras.layers.StringLookup , dan tf.keras.layers.TextVectorization ) menghasilkan sumber daya yang ditempatkan pada koordinator saat ini dengan pelatihan parameter server. Ini memiliki implikasi kinerja untuk RPC pencarian dari pekerja ke koordinator. Ini adalah prioritas tinggi saat ini untuk ditangani.

Spesifikasi Model.fit

  • argumen steps_per_epoch diperlukan di Model.fit . Anda dapat memilih nilai yang menyediakan interval yang sesuai dalam suatu zaman.
  • ParameterServerStrategy tidak memiliki dukungan untuk panggilan balik kustom yang memiliki panggilan tingkat batch untuk alasan kinerja. Anda harus mengonversi panggilan tersebut menjadi panggilan tingkat Epos dengan steps_per_epoch yang dipilih dengan tepat, sehingga panggilan tersebut dipanggil setiap steps_per_epoch sejumlah langkah. Callback bawaan tidak terpengaruh: panggilan tingkat batchnya telah dimodifikasi agar berkinerja. Mendukung panggilan tingkat batch untuk ParameterServerStrategy sedang direncanakan.
  • Untuk alasan yang sama, tidak seperti strategi lain, bilah kemajuan dan metrik dicatat hanya pada batas zaman.
  • run_eagerly tidak didukung.

Spesifik loop pelatihan khusus