Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
Ringkasan
Pelatihan server parameter adalah metode paralel data umum untuk meningkatkan pelatihan model pada beberapa mesin.
Sebuah cluster pelatihan server parameter terdiri dari pekerja dan server parameter . Variabel dibuat di server parameter dan dibaca serta diperbarui oleh pekerja di setiap langkah. Secara default, pekerja membaca dan memperbarui variabel ini secara independen tanpa menyinkronkan satu sama lain. Inilah sebabnya mengapa terkadang pelatihan gaya server parameter disebut pelatihan asinkron .
Di TensorFlow 2, pelatihan server parameter didukung oleh kelas tf.distribute.experimental.ParameterServerStrategy
, yang mendistribusikan langkah-langkah pelatihan ke cluster yang menskalakan hingga ribuan pekerja (disertai dengan server parameter).
Metode pelatihan yang didukung
Ada dua metode pelatihan utama yang didukung:
- Keras
Model.fit
API, yang direkomendasikan saat Anda lebih memilih abstraksi tingkat tinggi dan penanganan pelatihan. - Loop pelatihan kustom (Anda dapat merujuk ke Pelatihan kustom , Menulis loop pelatihan dari awal dan Loop pelatihan kustom dengan Keras dan MultiWorkerMirroredStrategy untuk detail selengkapnya.) Pelatihan loop kustom direkomendasikan bila Anda lebih memilih untuk menentukan detail loop pelatihannya.
Sebuah cluster dengan pekerjaan dan tugas
Terlepas dari API pilihan ( Model.fit
atau loop pelatihan khusus), pelatihan terdistribusi di TensorFlow 2 melibatkan: 'cluster'
dengan beberapa 'jobs'
, dan setiap tugas mungkin memiliki satu atau beberapa 'tasks'
.
Saat menggunakan pelatihan server parameter, disarankan untuk memiliki:
- Satu pekerjaan koordinator (yang memiliki nama pekerjaan
chief
) - Beberapa pekerjaan pekerja (pekerjaan nama
worker
); dan - Beberapa pekerjaan server parameter (nama pekerjaan
ps
)
Sementara koordinator membuat sumber daya, mengirimkan tugas pelatihan, menulis pos pemeriksaan, dan menangani kegagalan tugas, pekerja dan server parameter menjalankan tf.distribute.Server
yang mendengarkan permintaan dari koordinator.
Pelatihan server parameter dengan Model.fit
API
Pelatihan server parameter dengan Model.fit
API memerlukan koordinator untuk menggunakan objek tf.distribute.experimental.ParameterServerStrategy
, dan tf.keras.utils.experimental.DatasetCreator
sebagai input. Mirip dengan penggunaan Model.fit
tanpa strategi, atau dengan strategi lain, alur kerja melibatkan pembuatan dan kompilasi model, menyiapkan callback, diikuti dengan panggilan Model.fit
.
Pelatihan server parameter dengan loop pelatihan khusus
Dengan loop pelatihan khusus, kelas tf.distribute.experimental.coordinator.ClusterCoordinator
adalah komponen kunci yang digunakan untuk koordinator.
- Kelas
ClusterCoordinator
perlu bekerja bersama dengan objektf.distribute.Strategy
. - Objek
tf.distribute.Strategy
ini diperlukan untuk memberikan informasi cluster dan digunakan untuk menentukan langkah pelatihan, seperti yang ditunjukkan dalam pelatihan Kustom dengan tf.distribute.Strategy . - Objek
ClusterCoordinator
kemudian mengirimkan eksekusi langkah-langkah pelatihan ini ke pekerja jarak jauh. - Untuk pelatihan server parameter,
ClusterCoordinator
perlu bekerja dengantf.distribute.experimental.ParameterServerStrategy
.
API terpenting yang disediakan oleh objek ClusterCoordinator
adalah schedule
:
- API
schedule
mengantrekantf.function
dan segera mengembalikanRemoteValue
seperti masa depan. - Fungsi antrian akan dikirim ke pekerja jarak jauh di utas latar belakang dan
RemoteValue
s mereka akan diisi secara asinkron. - Karena
schedule
tidak memerlukan penugasan pekerja, fungsitf.function
diteruskan dapat dijalankan pada pekerja mana pun yang tersedia. - Jika pekerja yang dieksekusi menjadi tidak tersedia sebelum selesai, fungsi tersebut akan dicoba kembali pada pekerja lain yang tersedia.
- Karena fakta ini dan fakta bahwa eksekusi fungsi tidak atomik, suatu fungsi dapat dieksekusi lebih dari satu kali.
Selain mengirimkan fungsi jarak jauh, ClusterCoordinator
juga membantu membuat kumpulan data pada semua pekerja dan membangun kembali kumpulan data ini saat pekerja pulih dari kegagalan.
Pengaturan tutorial
Tutorial akan bercabang menjadi Model.fit
dan jalur loop pelatihan khusus, dan Anda dapat memilih salah satu yang sesuai dengan kebutuhan Anda. Bagian selain "Pelatihan dengan X" berlaku untuk kedua jalur.
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
Pengaturan kluster
Seperti disebutkan di atas, kluster pelatihan server parameter memerlukan tugas koordinator yang menjalankan program pelatihan Anda, satu atau beberapa pekerja dan tugas server parameter yang menjalankan server tf.distribute.Server
—dan mungkin tugas evaluasi tambahan yang menjalankan evaluasi side-car (lihat bagian evaluasi mobil samping di bawah). Persyaratan untuk mengaturnya adalah:
- Tugas koordinator perlu mengetahui alamat dan port semua server TensorFlow lainnya kecuali evaluator.
- Pekerja dan server parameter perlu mengetahui port mana yang perlu mereka dengarkan. Demi kesederhanaan, Anda biasanya dapat meneruskan informasi cluster lengkap saat membuat server TensorFlow pada tugas ini.
- Tugas evaluator tidak harus mengetahui penyiapan kluster pelatihan. Jika ya, seharusnya tidak mencoba menyambung ke kluster pelatihan.
- Pekerja dan server parameter harus memiliki jenis tugas masing-masing sebagai
"worker"
dan"ps"
. Koordinator harus menggunakan"chief"
sebagai jenis tugas untuk alasan warisan.
Dalam tutorial ini, Anda akan membuat cluster dalam proses sehingga pelatihan seluruh parameter server dapat dijalankan di Colab. Anda akan mempelajari cara menyiapkan kluster nyata di bagian selanjutnya.
Cluster dalam proses
Anda akan mulai dengan membuat beberapa server TensorFlow terlebih dahulu dan menghubungkannya nanti. Perhatikan bahwa ini hanya untuk tujuan demonstrasi tutorial ini, dan dalam pelatihan nyata server akan dimulai pada mesin "worker"
dan "ps"
.
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
Pengaturan cluster dalam proses sering digunakan dalam pengujian unit, seperti di sini .
Pilihan lain untuk pengujian lokal adalah meluncurkan proses pada mesin lokal—lihat Pelatihan multi-pekerja dengan Keras untuk contoh pendekatan ini.
Buat Instansi ParameterServerStrategy
Sebelum Anda mendalami kode pelatihan, mari kita buat instance objek ParameterServerStrategy
. Perhatikan bahwa ini diperlukan terlepas dari apakah Anda melanjutkan dengan Model.fit
atau loop pelatihan khusus. Argumen variable_partitioner
akan dijelaskan di bagian Variable sharding .
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
Untuk menggunakan GPU untuk pelatihan, alokasikan GPU yang terlihat oleh setiap pekerja. ParameterServerStrategy
akan menggunakan semua GPU yang tersedia pada setiap pekerja, dengan batasan bahwa semua pekerja harus memiliki jumlah GPU yang sama.
Pecahan variabel
Pecahan variabel mengacu pada pemisahan variabel menjadi beberapa variabel yang lebih kecil, yang disebut pecahan . Pecahan variabel mungkin berguna untuk mendistribusikan beban jaringan saat mengakses pecahan ini. Hal ini juga berguna untuk mendistribusikan komputasi dan penyimpanan variabel normal di beberapa server parameter.
Untuk mengaktifkan sharding variabel, Anda dapat memasukkan variable_partitioner
saat membuat objek ParameterServerStrategy
. variable_partitioner
akan dipanggil setiap kali variabel dibuat dan diharapkan untuk mengembalikan jumlah pecahan di sepanjang setiap dimensi variabel. Beberapa variable_partitioner
s out-of-box disediakan seperti tf.distribute.experimental.partitioners.MinSizePartitioner
. Disarankan untuk menggunakan partisi berbasis ukuran seperti tf.distribute.experimental.partitioners.MinSizePartitioner
untuk menghindari mempartisi variabel kecil, yang dapat berdampak negatif pada kecepatan pelatihan model.
Ketika variable_partitioner
diteruskan dan jika Anda membuat variabel langsung di bawah strategy.scope()
, itu akan menjadi tipe penampung dengan properti variables
yang menyediakan akses ke daftar pecahan. Dalam kebanyakan kasus, penampung ini akan secara otomatis dikonversi ke Tensor dengan menggabungkan semua pecahan. Akibatnya, dapat digunakan sebagai variabel normal. Di sisi lain, beberapa metode TensorFlow seperti tf.nn.embedding_lookup
menyediakan implementasi yang efisien untuk jenis penampung ini dan dalam metode ini penggabungan otomatis akan dihindari.
Silakan lihat dokumen API tf.distribute.experimental.ParameterServerStrategy
untuk detail selengkapnya.
Pelatihan dengan Model.fit
Keras menyediakan API pelatihan yang mudah digunakan melalui Model.fit
yang menangani loop pelatihan di bawah tenda, dengan fleksibilitas train_step
yang dapat diganti, dan callback, yang menyediakan fungsionalitas seperti penyimpanan pos pemeriksaan atau penyimpanan ringkasan untuk TensorBoard. Dengan Model.fit
, kode pelatihan yang sama dapat digunakan untuk strategi lain dengan pertukaran sederhana dari objek strategi.
Memasukan data
Model.fit
dengan pelatihan server parameter mengharuskan data input disediakan dalam callable yang mengambil argumen tunggal bertipe tf.distribute.InputContext
, dan mengembalikan tf.data.Dataset
. Kemudian, buat objek tf.keras.utils.experimental.DatasetCreator
yang menggunakan callable
tersebut, dan objek tf.distribute.InputOptions
opsional melalui argumen input_options
.
Perhatikan bahwa dianjurkan untuk mengacak dan mengulang data dengan pelatihan parameter server, dan menentukan steps_per_epoch
dalam panggilan yang fit
sehingga perpustakaan mengetahui batas-batas zaman.
Silakan lihat tutorial input Terdistribusi untuk informasi lebih lanjut tentang argumen InputContext
.
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
Kode di dataset_fn
akan dipanggil pada perangkat input, yang biasanya CPU, pada setiap mesin pekerja.
Konstruksi dan kompilasi model
Sekarang, Anda akan membuat tf.keras.Model
tf.keras.models.Sequential
sepele untuk tujuan demonstrasi—diikuti oleh panggilan Model.compile
untuk memasukkan komponen, seperti pengoptimal, metrik, atau parameter seperti steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
Panggilan balik dan pelatihan
Sebelum Anda memanggil model.fit
untuk pelatihan yang sebenarnya, mari siapkan panggilan balik yang diperlukan untuk tugas-tugas umum, seperti:
-
ModelCheckpoint
: untuk menyimpan bobot model. -
BackupAndRestore
: untuk memastikan progres pelatihan dicadangkan secara otomatis, dan dipulihkan jika cluster mengalami unavailability (seperti abort atau preemption); atau -
TensorBoard
: untuk menyimpan laporan kemajuan ke dalam file ringkasan, yang divisualisasikan di alat TensorBoard.
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
Penggunaan langsung dengan ClusterCoordinator
(opsional)
Bahkan jika Anda memilih jalur pelatihan Model.fit
, Anda dapat secara opsional membuat instance objek tf.distribute.experimental.coordinator.ClusterCoordinator
untuk menjadwalkan fungsi lain yang ingin Anda jalankan pada pekerja. Lihat bagian Pelatihan dengan loop pelatihan khusus untuk detail dan contoh selengkapnya.
Pelatihan dengan loop pelatihan khusus
Menggunakan loop pelatihan khusus dengan tf.distribute.Strategy
memberikan fleksibilitas tinggi untuk menentukan loop pelatihan. Dengan ParameterServerStrategy
yang didefinisikan di atas (sebagai strategy
), Anda akan menggunakan tf.distribute.experimental.coordinator.ClusterCoordinator
untuk mengirimkan eksekusi langkah-langkah pelatihan ke pekerja jarak jauh.
Kemudian, Anda akan membuat model, mendefinisikan kumpulan data dan fungsi langkah, seperti yang telah Anda lakukan di loop pelatihan dengan tf.distribute.Strategy
s lainnya. Anda dapat menemukan detail lebih lanjut di pelatihan Kustom dengan tutorial tf.distribute.Strategy .
Untuk memastikan pengambilan awal set data yang efisien, gunakan API pembuatan set data terdistribusi yang direkomendasikan yang disebutkan di bagian Langkah pelatihan pengiriman ke pekerja jarak jauh di bawah ini. Selain itu, pastikan untuk memanggil Strategy.run
di dalam worker_fn
untuk memanfaatkan sepenuhnya GPU yang dialokasikan untuk pekerja. Langkah-langkah selanjutnya sama untuk pelatihan dengan atau tanpa GPU.
Mari kita buat komponen-komponen ini dalam langkah-langkah berikut:
Siapkan datanya
Pertama, tulis fungsi yang membuat kumpulan data yang menyertakan logika prapemrosesan yang diimplementasikan oleh lapisan prapemrosesan Keras .
Anda akan membuat lapisan ini di luar dataset_fn
tetapi menerapkan transformasi di dalam dataset_fn
, karena Anda akan membungkus dataset_fn
menjadi tf.function
, yang tidak mengizinkan variabel dibuat di dalamnya.
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
Buat contoh mainan dalam kumpulan data:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
Kemudian, buat set data pelatihan yang dibungkus dengan dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
Bangun modelnya
Selanjutnya, buat model dan objek lainnya. Pastikan untuk membuat semua variabel di bawah strategy.scope
.
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
Mari kita konfirmasikan bahwa penggunaan FixedShardsPartitioner
membagi semua variabel menjadi dua pecahan dan setiap pecahan ditugaskan ke server parameter yang berbeda:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
Tentukan langkah pelatihan
Ketiga, buat langkah pelatihan yang dibungkus dengan tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
Dalam fungsi langkah pelatihan di atas, memanggil Strategy.run
dan Strategy.reduce
di step_fn
dapat mendukung beberapa GPU per pekerja. Jika para pekerja memiliki GPU yang dialokasikan, Strategy.run
akan mendistribusikan kumpulan data pada beberapa replika.
Mengirimkan langkah-langkah pelatihan ke pekerja jarak jauh
Setelah semua perhitungan ditentukan oleh ParameterServerStrategy
, Anda akan menggunakan kelas tf.distribute.experimental.coordinator.ClusterCoordinator
untuk membuat sumber daya dan mendistribusikan langkah-langkah pelatihan ke pekerja jarak jauh.
Pertama-tama mari kita buat objek ClusterCoordinator
dan berikan objek strategi:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
Kemudian, buat set data per pekerja dan iterator. Dalam per_worker_dataset_fn
di bawah ini, disarankan untuk dataset_fn
ke dalam strategy.distribute_datasets_from_function
untuk memungkinkan pra-pengambilan yang efisien ke GPU dengan mulus.
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
Langkah terakhir adalah mendistribusikan perhitungan ke pekerja jarak jauh menggunakan ClusterCoordinator.schedule
:
- Metode
schedule
mengantrekantf.function
dan segera mengembalikanRemoteValue
seperti masa depan. Fungsi yang diantrekan akan dikirim ke pekerja jarak jauh di utas latar belakang danRemoteValue
akan diisi secara asinkron. - Metode
join
(ClusterCoordinator.join
) dapat digunakan untuk menunggu sampai semua fungsi terjadwal dieksekusi.
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
Inilah cara Anda mengambil hasil dari RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000
Atau, Anda dapat meluncurkan semua langkah dan melakukan sesuatu sambil menunggu penyelesaian:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
Untuk pelatihan lengkap dan alur kerja penyajian untuk contoh khusus ini, silakan lihat pengujian ini .
Selengkapnya tentang pembuatan kumpulan data
Dataset dalam kode di atas dibuat menggunakan ClusterCoordinator.create_per_worker_dataset
API). Ini membuat satu set data per pekerja dan mengembalikan objek kontainer. Anda dapat memanggil metode iter
untuk membuat iterator per pekerja. Iterator per pekerja berisi satu iterator per pekerja dan potongan pekerja yang sesuai akan diganti dalam argumen input dari fungsi yang diteruskan ke metode ClusterCoordinator.schedule
sebelum fungsi dijalankan pada pekerja tertentu.
Saat ini, metode ClusterCoordinator.schedule
mengasumsikan pekerja setara dan dengan demikian mengasumsikan kumpulan data pada pekerja yang berbeda adalah sama kecuali mereka mungkin diacak secara berbeda jika berisi operasi Dataset.shuffle
. Karena itu, juga disarankan agar kumpulan data diulang tanpa batas waktu dan Anda menjadwalkan sejumlah langkah yang terbatas daripada mengandalkan OutOfRangeError
dari kumpulan data.
Catatan penting lainnya adalah bahwa kumpulan data tf.data
tidak mendukung serialisasi dan deserialisasi implisit melintasi batas tugas. Jadi, penting untuk membuat seluruh dataset di dalam fungsi yang diteruskan ke ClusterCoordinator.create_per_worker_dataset
.
Evaluasi
Ada lebih dari satu cara untuk mendefinisikan dan menjalankan lingkaran evaluasi dalam pelatihan terdistribusi. Masing-masing memiliki pro dan kontra sendiri seperti yang dijelaskan di bawah ini. Metode evaluasi sebaris disarankan jika Anda tidak memiliki preferensi.
Evaluasi sebaris
Dalam metode ini, koordinator bergantian antara pelatihan dan evaluasi dan oleh karena itu disebut evaluasi sebaris .
Ada beberapa manfaat dari evaluasi inline. Sebagai contoh:
- Ini dapat mendukung model evaluasi besar dan kumpulan data evaluasi yang tidak dapat dimiliki oleh satu tugas.
- Hasil evaluasi dapat digunakan untuk mengambil keputusan untuk pelatihan epoch selanjutnya.
Ada dua cara untuk menerapkan evaluasi inline: evaluasi langsung dan evaluasi terdistribusi.
- Evaluasi langsung : Untuk model kecil dan dataset evaluasi, koordinator dapat menjalankan evaluasi secara langsung pada model terdistribusi dengan dataset evaluasi pada koordinator:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- Evaluasi terdistribusi : Untuk model besar atau kumpulan data yang tidak layak dijalankan langsung pada koordinator, tugas koordinator dapat mendistribusikan tugas evaluasi kepada pekerja melalui metode
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
Evaluasi mobil samping
Metode lain disebut evaluasi mobil samping di mana Anda membuat tugas evaluator khusus yang berulang kali membaca pos pemeriksaan dan menjalankan evaluasi pada pos pemeriksaan terbaru. Ini memungkinkan program pelatihan Anda selesai lebih awal jika Anda tidak perlu mengubah lingkaran pelatihan berdasarkan hasil evaluasi. Namun, ini membutuhkan tugas evaluator tambahan dan pos pemeriksaan berkala untuk memicu evaluasi. Berikut ini adalah kemungkinan putaran evaluasi mobil samping:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
Cluster di dunia nyata
Dalam lingkungan produksi nyata, Anda akan menjalankan semua tugas dalam proses yang berbeda pada mesin yang berbeda. Cara paling sederhana untuk mengonfigurasi informasi cluster pada setiap tugas adalah dengan menyetel variabel lingkungan "TF_CONFIG"
dan menggunakan tf.distribute.cluster_resolver.TFConfigClusterResolver
untuk mengurai "TF_CONFIG"
.
Untuk deskripsi umum tentang variabel lingkungan "TF_CONFIG"
, lihat panduan pelatihan Terdistribusi .
Jika Anda memulai tugas pelatihan menggunakan Kubernetes atau template konfigurasi lainnya, kemungkinan besar template ini telah menyetel “TF_CONFIG"
untuk Anda.
Setel variabel lingkungan "TF_CONFIG"
Misalkan Anda memiliki 3 pekerja dan 2 server parameter, "TF_CONFIG"
pekerja 1 dapat berupa:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
"TF_CONFIG"
dari evaluator dapat berupa:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
Bagian "cluster"
dalam string "TF_CONFIG"
di atas untuk evaluator adalah opsional.
Jika Anda menggunakan biner yang sama untuk semua tugas
Jika Anda lebih suka menjalankan semua tugas ini menggunakan biner tunggal, Anda harus membiarkan program Anda bercabang ke peran yang berbeda di awal:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
Kode berikut memulai server TensorFlow dan menunggu:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
Menangani kegagalan tugas
Kegagalan pekerja
tf.distribute.experimental.coordinator.ClusterCoordinator
atau Model.fit
menyediakan toleransi kesalahan bawaan untuk kegagalan pekerja. Setelah pemulihan pekerja, fungsi kumpulan data yang disediakan sebelumnya (baik ke ClusterCoordinator.create_per_worker_dataset
untuk loop pelatihan khusus, atau tf.keras.utils.experimental.DatasetCreator
untuk Model.fit
) akan dipanggil pada pekerja untuk membuat kembali kumpulan data.
Server parameter atau kegagalan koordinator
Namun, ketika koordinator melihat kesalahan server parameter, itu akan segera memunculkan UnavailableError
atau AbortedError
. Anda dapat memulai ulang koordinator dalam kasus ini. Koordinator itu sendiri juga bisa menjadi tidak tersedia. Oleh karena itu, perkakas tertentu direkomendasikan agar tidak kehilangan kemajuan pelatihan:
Untuk
Model.fit
, Anda harus menggunakan callbackBackupAndRestore
, yang menangani penyimpanan dan pemulihan progres secara otomatis. Lihat bagian Callback dan pelatihan di atas untuk contoh.Untuk loop pelatihan khusus, Anda harus memeriksa variabel model secara berkala dan memuat variabel model dari pos pemeriksaan, jika ada, sebelum pelatihan dimulai. Kemajuan pelatihan dapat disimpulkan kira-kira dari
optimizer.iterations
. iterasi jika pengoptimal diperiksa:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
Mengambil RemoteValue
Mengambil RemoteValue
dijamin berhasil jika suatu fungsi berhasil dijalankan. Ini karena saat ini nilai kembalian segera disalin ke koordinator setelah suatu fungsi dijalankan. Jika ada kegagalan pekerja selama penyalinan, fungsi tersebut akan dicoba kembali pada pekerja lain yang tersedia. Oleh karena itu, jika Anda ingin mengoptimalkan kinerja, Anda dapat menjadwalkan fungsi tanpa nilai balik.
Pelaporan kesalahan
Setelah koordinator melihat kesalahan seperti UnavailableError
dari server parameter atau kesalahan aplikasi lain seperti InvalidArgument
dari tf.debugging.check_numerics
, itu akan membatalkan semua fungsi tertunda dan antri sebelum meningkatkan kesalahan. Mengambil RemoteValue
s yang sesuai akan memunculkan CancelledError
.
Setelah kesalahan dimunculkan, koordinator tidak akan memunculkan kesalahan yang sama atau kesalahan apa pun dari fungsi yang dibatalkan.
Peningkatan performa
Ada beberapa kemungkinan alasan jika Anda melihat masalah kinerja saat Anda berlatih dengan ParameterServerStrategy
dan ClusterResolver
.
Salah satu alasan umum adalah server parameter memiliki beban yang tidak seimbang dan beberapa server parameter dengan beban berat telah mencapai kapasitas. Ada juga beberapa akar penyebab. Beberapa metode sederhana untuk mengurangi masalah ini adalah dengan:
- Pisahkan variabel model besar Anda dengan menentukan
variable_partitioner
saat membuatParameterServerStrategy
. - Hindari membuat variabel hotspot yang diperlukan oleh semua server parameter dalam satu langkah jika memungkinkan. Misalnya, gunakan tingkat pembelajaran konstan atau subkelas
tf.keras.optimizers.schedules.LearningRateSchedule
di pengoptimal karena perilaku default adalah bahwa tingkat pembelajaran akan menjadi variabel yang ditempatkan pada server parameter tertentu dan diminta oleh semua server parameter lain di setiap langkah . - Acak kosakata besar Anda sebelum meneruskannya ke lapisan prapemrosesan Keras.
Alasan lain yang mungkin untuk masalah kinerja adalah koordinator. Implementasi schedule
/ join
pertama Anda berbasis Python dan karenanya mungkin memiliki overhead threading. Latensi antara koordinator dan pekerja juga bisa besar. Jika ini masalahnya,
Untuk
Model.fit
, Anda dapat mengatur argumensteps_per_execution
yang disediakan diModel.compile
ke nilai yang lebih besar dari 1.Untuk loop pelatihan khusus, Anda dapat mengemas beberapa langkah ke dalam satu
tf.function
:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
Karena perpustakaan dioptimalkan lebih lanjut, semoga sebagian besar pengguna tidak perlu mengemas langkah-langkah secara manual di masa mendatang.
Selain itu, trik kecil untuk peningkatan kinerja adalah menjadwalkan fungsi tanpa nilai kembalian seperti yang dijelaskan di bagian kegagalan tugas penanganan di atas.
Batasan yang diketahui
Sebagian besar batasan yang diketahui sudah tercakup dalam bagian di atas. Bagian ini memberikan ringkasan.
ParameterServerStrategy
umum
-
os.environment["grpc_fail_fast"]="use_caller"
diperlukan pada setiap tugas termasuk koordinator, untuk membuat toleransi kesalahan bekerja dengan baik. - Pelatihan server parameter sinkron tidak didukung.
- Biasanya diperlukan untuk mengemas beberapa langkah ke dalam satu fungsi untuk mencapai kinerja yang optimal.
- Tidak didukung untuk memuat save_model melalui
tf.saved_model.load
yang berisi variabel sharded. Catatan memuat save_model seperti itu menggunakan TensorFlow Serving diharapkan berfungsi. - Tidak didukung untuk memuat pos pemeriksaan yang berisi variabel slot pengoptimal pecahan ke dalam jumlah pecahan yang berbeda.
- Tidak didukung untuk memulihkan dari kegagalan server parameter tanpa memulai ulang tugas koordinator.
- Penggunaan
tf.lookup.StaticHashTable
(yang biasanya digunakan oleh beberapa lapisan prapemrosesan Keras, sepertitf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
, dantf.keras.layers.TextVectorization
) menghasilkan sumber daya yang ditempatkan pada koordinator saat ini dengan pelatihan parameter server. Ini memiliki implikasi kinerja untuk RPC pencarian dari pekerja ke koordinator. Ini adalah prioritas tinggi saat ini untuk ditangani.
Spesifikasi Model.fit
- argumen
steps_per_epoch
diperlukan diModel.fit
. Anda dapat memilih nilai yang menyediakan interval yang sesuai dalam suatu zaman. -
ParameterServerStrategy
tidak memiliki dukungan untuk panggilan balik kustom yang memiliki panggilan tingkat batch untuk alasan kinerja. Anda harus mengonversi panggilan tersebut menjadi panggilan tingkat Epos dengansteps_per_epoch
yang dipilih dengan tepat, sehingga panggilan tersebut dipanggil setiapsteps_per_epoch
sejumlah langkah. Callback bawaan tidak terpengaruh: panggilan tingkat batchnya telah dimodifikasi agar berkinerja. Mendukung panggilan tingkat batch untukParameterServerStrategy
sedang direncanakan. - Untuk alasan yang sama, tidak seperti strategi lain, bilah kemajuan dan metrik dicatat hanya pada batas zaman.
-
run_eagerly
tidak didukung.
Spesifik loop pelatihan khusus
-
ClusterCoordinator.schedule
tidak mendukung jaminan kunjungan untuk kumpulan data. - Ketika
ClusterCoordinator.create_per_worker_dataset
digunakan, seluruh dataset harus dibuat di dalam fungsi yang diteruskan ke sana. -
tf.data.Options
diabaikan dalam kumpulan data yang dibuat olehClusterCoordinator.create_per_worker_dataset
.