ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ภาพรวม
บทช่วยสอนนี้สาธิตวิธีดำเนินการฝึกอบรมแบบกระจายผู้ปฏิบัติงานหลายคนด้วยโมเดล Keras และ Model.fit
API โดยใช้ tf.distribute.Strategy
API โดยเฉพาะคลาส tf.distribute.MultiWorkerMirroredStrategy
ด้วยความช่วยเหลือของกลยุทธ์นี้ โมเดล Keras ที่ออกแบบมาให้ทำงานบนผู้ปฏิบัติงานคนเดียวสามารถทำงานกับผู้ปฏิบัติงานหลายคนได้อย่างราบรื่นโดยมีการเปลี่ยนแปลงโค้ดเพียงเล็กน้อย
สำหรับผู้ที่สนใจเข้าใจอย่างลึกซึ้งยิ่งขึ้นเกี่ยวกับ tf.distribute.Strategy
APIs การ ฝึกอบรมแบบกระจายในคู่มือ TensorFlow จะมีให้สำหรับภาพรวมของกลยุทธ์การจัดจำหน่ายที่ TensorFlow รองรับ
หากต้องการเรียนรู้วิธีใช้ MultiWorkerMirroredStrategy
กับ Keras และลูปการฝึกแบบกำหนดเอง โปรดดูที่ Custom training loop กับ Keras และ MultiWorkerMirroredStrategy
โปรดทราบว่าจุดประสงค์ของบทช่วยสอนนี้คือเพื่อแสดงตัวอย่างผู้ปฏิบัติงานหลายคนขั้นต่ำที่มีผู้ปฏิบัติงานสองคน
ติดตั้ง
เริ่มต้นด้วยการนำเข้าที่จำเป็น:
import json
import os
import sys
ก่อนนำเข้า TensorFlow ให้ทำการเปลี่ยนแปลงบางอย่างกับสภาพแวดล้อม:
- ปิดการใช้งาน GPU ทั้งหมด ซึ่งจะป้องกันข้อผิดพลาดที่เกิดจากพนักงานทุกคนที่พยายามใช้ GPU เดียวกัน ในการใช้งานจริง ผู้ปฏิบัติงานแต่ละคนจะอยู่คนละเครื่องกัน
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- รีเซ็ตตัวแปรสภาพแวดล้อม
TF_CONFIG
(คุณจะได้เรียนรู้เพิ่มเติมเกี่ยวกับสิ่งนี้ในภายหลัง):
os.environ.pop('TF_CONFIG', None)
- ตรวจสอบให้แน่ใจว่าไดเร็กทอรีปัจจุบันอยู่บนเส้นทางของ Python ซึ่งจะทำให้โน้ตบุ๊กนำเข้าไฟล์ที่เขียนโดย
%%writefile
ได้ในภายหลัง:
if '.' not in sys.path:
sys.path.insert(0, '.')
ตอนนี้นำเข้า TensorFlow:
import tensorflow as tf
นิยามชุดข้อมูลและโมเดล
ถัดไป สร้างไฟล์ mnist_setup.py
ด้วยการตั้งค่าโมเดลและชุดข้อมูลอย่างง่าย ไฟล์ Python นี้จะถูกใช้โดยกระบวนการของผู้ปฏิบัติงานในบทช่วยสอนนี้:
%%writefile mnist_setup.py
import os
import tensorflow as tf
import numpy as np
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
Writing mnist_setup.py
อบรมต้นแบบกับคนทำงานคนเดียว
ลองฝึกแบบจำลองสำหรับยุคสมัยจำนวนน้อยๆ และสังเกตผลลัพธ์ของ ผู้ปฏิบัติงานคนเดียว เพื่อให้แน่ใจว่าทุกอย่างทำงานอย่างถูกต้อง เมื่อการฝึกดำเนินไป การสูญเสียควรลดลงและความแม่นยำควรเพิ่มขึ้น
import mnist_setup
batch_size = 64
single_worker_dataset = mnist_setup.mnist_dataset(batch_size)
single_worker_model = mnist_setup.build_and_compile_cnn_model()
single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz 11493376/11490434 [==============================] - 0s 0us/step 11501568/11490434 [==============================] - 0s 0us/step 2022-02-05 02:20:59.945141: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected Epoch 1/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2839 - accuracy: 0.1788 Epoch 2/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2492 - accuracy: 0.3185 Epoch 3/3 70/70 [==============================] - 1s 12ms/step - loss: 2.2012 - accuracy: 0.4795 <keras.callbacks.History at 0x7f666a2e4510>
การกำหนดค่าผู้ปฏิบัติงานหลายคน
มาเข้าสู่โลกของการฝึกอบรมผู้ปฏิบัติงานหลายคนกันเถอะ
คลัสเตอร์ที่มีงานและงาน
ใน TensorFlow การฝึกอบรมแบบกระจายเกี่ยวข้องกับ: 'cluster'
ที่มีหลายงาน และแต่ละงานอาจมี 'task'
หนึ่งงานขึ้นไป
คุณจะต้องใช้ตัวแปรสภาพแวดล้อมการกำหนดค่า TF_CONFIG
สำหรับการฝึกอบรมบนเครื่องหลายเครื่อง ซึ่งแต่ละเครื่องอาจมีบทบาทที่แตกต่างกัน TF_CONFIG
เป็นสตริง JSON ที่ใช้ในการระบุการกำหนดค่าคลัสเตอร์สำหรับผู้ปฏิบัติงานแต่ละคนที่เป็นส่วนหนึ่งของคลัสเตอร์
มีสององค์ประกอบของตัวแปร TF_CONFIG
: 'cluster'
และ 'task'
'cluster'
จะเหมือนกันสำหรับผู้ปฏิบัติงานทุกคน และให้ข้อมูลเกี่ยวกับคลัสเตอร์การฝึกอบรม ซึ่งเป็นคำสั่งที่ประกอบด้วยงานประเภทต่างๆ เช่น'worker'
หรือ'chief'
- ในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย
tf.distribute.MultiWorkerMirroredStrategy
มักจะมี'worker'
หนึ่งที่รับผิดชอบ เช่น การบันทึกจุดตรวจและการเขียนไฟล์สรุปสำหรับ TensorBoard นอกเหนือจากสิ่งที่'worker'
ปกติทำ'worker'
ดังกล่าวเรียกว่า chief worker (มีชื่องานว่า'chief'
) - เป็นเรื่องปกติสำหรับ
'chief'
ที่จะต้องแต่งตั้ง'index'
0
ให้ (อันที่จริง นี่คือวิธีการใช้งานtf.distribute.Strategy
)
- ในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย
'task'
ให้ข้อมูลของงานปัจจุบันและแตกต่างกันไปสำหรับผู้ปฏิบัติงานแต่ละคน ระบุ'type'
และ'index'
ของผู้ปฏิบัติงานนั้น
ด้านล่างนี้คือตัวอย่างการกำหนดค่า:
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': 0}
}
นี่คือ TF_CONFIG
เดียวกันที่จัดลำดับเป็นสตริง JSON:
json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'
โปรดทราบว่า tf_config
เป็นเพียงตัวแปรโลคัลใน Python เพื่อให้สามารถใช้สำหรับการกำหนดค่าการฝึกอบรม dict นี้ต้องได้รับการจัดลำดับเป็น JSON และวางไว้ในตัวแปรสภาพแวดล้อม TF_CONFIG
ในการกำหนดค่าตัวอย่างด้านบน คุณตั้งค่างาน 'type'
เป็น 'worker'
และงาน 'index'
เป็น 0
ดังนั้นเครื่องนี้จึงเป็นผู้ปฏิบัติงานคน แรก โดยจะได้รับการแต่งตั้งให้เป็น 'chief'
ผู้ปฏิบัติงานและทำงานมากกว่าคนอื่นๆ
เพื่อจุดประสงค์ในการอธิบายประกอบ บทช่วยสอนนี้จะแสดงวิธีตั้งค่าตัวแปร TF_CONFIG
กับผู้ปฏิบัติงานสองคนบน localhost
ในทางปฏิบัติ คุณจะต้องสร้างผู้ปฏิบัติงานหลายคนบนที่อยู่/พอร์ต IP ภายนอก และตั้งค่าตัวแปร TF_CONFIG
ให้กับผู้ปฏิบัติงานแต่ละคนตามลำดับ
ในบทช่วยสอนนี้ คุณจะใช้คนงานสองคน:
-
TF_CONFIG
ของผู้ปฏิบัติงานคนแรก ('chief'
) แสดงไว้ด้านบน - สำหรับผู้ปฏิบัติงานคนที่สอง คุณจะต้องตั้งค่า
tf_config['task']['index']=1
ตัวแปรสภาพแวดล้อมและกระบวนการย่อยในโน้ตบุ๊ก
กระบวนการย่อยรับช่วงตัวแปรสภาพแวดล้อมจากพาเรนต์
ตัวอย่างเช่น คุณสามารถตั้งค่าตัวแปรสภาพแวดล้อมในกระบวนการ Jupyter Notebook ได้ดังนี้:
os.environ['GREETINGS'] = 'Hello TensorFlow!'
จากนั้น คุณสามารถเข้าถึงตัวแปรสภาพแวดล้อมจากกระบวนการย่อยได้:
echo ${GREETINGS}
Hello TensorFlow!
ในส่วนถัดไป คุณจะใช้วิธีการที่คล้ายกันเพื่อส่ง TF_CONFIG
ไปยังกระบวนการย่อยของผู้ปฏิบัติงาน ในสถานการณ์จริง คุณจะไม่เริ่มงานด้วยวิธีนี้ แต่ในตัวอย่างนี้ก็เพียงพอแล้ว
เลือกกลยุทธ์ที่เหมาะสม
ใน TensorFlow การฝึกอบรมแบบกระจายมีสองรูปแบบหลัก:
- การฝึกอบรมแบบซิงโครนัส โดยที่ขั้นตอนของการฝึกอบรมจะซิงค์กันระหว่างผู้ปฏิบัติงานและแบบจำลอง และ
- การฝึกอบรมแบบอะซิงโครนัส โดยที่ขั้นตอนการฝึกอบรมไม่ได้ซิงค์อย่างเคร่งครัด (เช่น การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ )
บทช่วยสอนนี้สาธิตวิธีดำเนินการฝึกอบรมผู้ปฏิบัติงานหลายคนแบบซิงโครนัสโดยใช้อินสแตนซ์ของ tf.distribute.MultiWorkerMirroredStrategy
MultiWorkerMirroredStrategy
สร้างสำเนาของตัวแปรทั้งหมดในเลเยอร์ของโมเดลบนอุปกรณ์แต่ละเครื่องของผู้ปฏิบัติงานทุกคน มันใช้ CollectiveOps
ซึ่งเป็น TensorFlow op สำหรับการสื่อสารแบบกลุ่ม เพื่อรวมการไล่ระดับสีและทำให้ตัวแปรซิงค์กัน คู่มือ tf.distribute.Strategy
มีรายละเอียดเพิ่มเติมเกี่ยวกับกลยุทธ์นี้
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled. INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
MultiWorkerMirroredStrategy
มีการใช้งานหลายอย่าง tf.distribute.experimental.CommunicationOptions
RING
2) NCCL
ใช้ NVIDIA Collective Communication Library เพื่อใช้งานส่วนรวม; และ 3) AUTO
เลื่อนตัวเลือกไปที่รันไทม์ ทางเลือกที่ดีที่สุดของการใช้งานแบบรวมขึ้นอยู่กับจำนวนและประเภทของ GPU และการเชื่อมต่อเครือข่ายในคลัสเตอร์
ฝึกโมเดล
ด้วยการรวม tf.distribute.Strategy
API เข้ากับ tf.keras
การเปลี่ยนแปลงเพียงอย่างเดียวที่คุณจะทำเพื่อแจกจ่ายการฝึกอบรมให้กับผู้ปฏิบัติงานหลายคนคือการปิด model building และ model.compile()
ไว้ภายใน strategy.scope()
ขอบเขตของกลยุทธ์การกระจายกำหนดวิธีการและตำแหน่งที่ตัวแปรถูกสร้างขึ้น และในกรณีของ MultiWorkerMirroredStrategy
ตัวแปรที่สร้างขึ้นคือ MirroredVariable
และจะถูกจำลองแบบกับผู้ปฏิบัติงานแต่ละคน
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
หากต้องการใช้งาน MultiWorkerMirroredStrategy
คุณจะต้องเรียกใช้กระบวนการของผู้ปฏิบัติงานและส่ง TF_CONFIG
ให้พวกเขา
เช่นเดียวกับไฟล์ mnist_setup.py
ที่เขียนไว้ก่อนหน้านี้ นี่คือ main.py
ที่พนักงานแต่ละคนจะทำงาน:
%%writefile main.py
import os
import json
import tensorflow as tf
import mnist_setup
per_worker_batch_size = 64
tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()
global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)
with strategy.scope():
# Model building/compiling need to be within `strategy.scope()`.
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
Writing main.py
ในข้อมูลโค้ดด้านบน โปรดทราบว่า global_batch_size
ซึ่งส่งผ่านไปยัง Dataset.batch
ถูกตั้งค่าเป็น per_worker_batch_size * num_workers
เพื่อให้แน่ใจว่าผู้ปฏิบัติงานแต่ละคนประมวลผลกลุ่มตัวอย่าง per_worker_batch_size
โดยไม่คำนึงถึงจำนวนผู้ปฏิบัติงาน
ไดเร็กทอรีปัจจุบันมีทั้งไฟล์ Python:
ls *.py
main.py mnist_setup.pyตัวยึดตำแหน่ง22
ดังนั้น json-serialize TF_CONFIG
และเพิ่มลงในตัวแปรสภาพแวดล้อม:
os.environ['TF_CONFIG'] = json.dumps(tf_config)
ตอนนี้คุณสามารถเปิดกระบวนการของผู้ปฏิบัติงานที่จะเรียกใช้ main.py
และใช้ TF_CONFIG
:
# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log
มีบางสิ่งที่ควรทราบเกี่ยวกับคำสั่งข้างต้น:
- มันใช้
%%bash
ซึ่งเป็น "เวทย์มนตร์" ของโน้ตบุ๊ก เพื่อเรียกใช้คำสั่งทุบตี - มันใช้แฟ
--bg
เพื่อรันกระบวนการbash
ในพื้นหลัง เนื่องจากผู้ปฏิบัติงานนี้จะไม่ยุติการทำงาน มันรอคนงานทั้งหมดก่อนที่จะเริ่ม
กระบวนการทำงานเบื้องหลังจะไม่พิมพ์ผลลัพธ์ไปยังสมุดบันทึกนี้ ดังนั้น &>
จึงเปลี่ยนเส้นทางเอาต์พุตไปยังไฟล์ เพื่อให้คุณสามารถตรวจสอบสิ่งที่เกิดขึ้นในไฟล์บันทึกได้ในภายหลัง
ดังนั้น รอสักครู่เพื่อให้กระบวนการเริ่มต้นขึ้น:
import time
time.sleep(10)
ตอนนี้ ให้ตรวจสอบสิ่งที่ส่งออกไปยังล็อกไฟล์ของพนักงาน:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
บรรทัดสุดท้ายของล็อกไฟล์ควรระบุว่า: Started server with target: grpc://localhost:12345
ตอนนี้ผู้ปฏิบัติงานคนแรกพร้อมแล้ว และกำลังรอผู้ปฏิบัติงานคนอื่นๆ ให้พร้อมดำเนินการต่อไป
ดังนั้นให้อัปเดต tf_config
เพื่อให้กระบวนการของผู้ปฏิบัติงานคนที่สองได้รับ:
tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)
เปิดตัวคนงานที่สอง การดำเนินการนี้จะเริ่มการฝึกอบรมเนื่องจากพนักงานทุกคนทำงานอยู่ (ดังนั้นจึงไม่จำเป็นต้องดำเนินการตามขั้นตอน):
python main.py
Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901 2022-02-05 02:21:16.367945: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.234030: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.450972: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.ตัวยึดตำแหน่ง32
หากคุณตรวจสอบบันทึกที่เขียนโดยผู้ปฏิบัติงานคนแรกอีกครั้ง คุณจะได้เรียนรู้ว่าบันทึกนั้นได้เข้าร่วมในการฝึกอบรมโมเดลดังกล่าว:
cat job_0.log
2022-02-05 02:21:06.348503: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected 2022-02-05 02:21:17.232316: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:0" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-02-05 02:21:17.457812: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. Epoch 1/3 70/70 [==============================] - 6s 51ms/step - loss: 2.2766 - accuracy: 0.1722 Epoch 2/3 70/70 [==============================] - 3s 48ms/step - loss: 2.2172 - accuracy: 0.4157 Epoch 3/3 70/70 [==============================] - 3s 49ms/step - loss: 2.1471 - accuracy: 0.5901
ไม่น่าแปลกใจเลยที่การดำเนินการนี้ ช้า กว่าการทดสอบในตอนต้นของบทช่วยสอนนี้
การเรียกใช้คนงานหลายคนในเครื่องเดียวจะเพิ่มค่าใช้จ่ายเท่านั้น
เป้าหมายที่นี่ไม่ใช่เพื่อปรับปรุงเวลาการฝึกอบรม แต่เพียงเพื่อให้ตัวอย่างการฝึกอบรมหลายคน
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.ตัวยึดตำแหน่ง36
การฝึกอบรมพนักงานหลายคนในเชิงลึก
จนถึงตอนนี้ คุณได้เรียนรู้วิธีตั้งค่าพื้นฐานสำหรับผู้ปฏิบัติงานหลายคนแล้ว
ในช่วงที่เหลือของบทช่วยสอน คุณจะได้เรียนรู้เกี่ยวกับปัจจัยอื่นๆ ที่อาจเป็นประโยชน์หรือสำคัญสำหรับกรณีการใช้งานจริงโดยละเอียด
การแบ่งกลุ่มข้อมูล
ในการฝึกอบรมผู้ปฏิบัติงานหลายคน จำเป็นต้องมีการ แบ่งกลุ่มข้อมูล เพื่อให้แน่ใจว่ามีการบรรจบกันและประสิทธิภาพ
ตัวอย่างในส่วนก่อนหน้านี้อาศัย autosharding เริ่มต้นที่จัดเตรียมโดย tf.distribute.Strategy
API คุณสามารถควบคุมการแบ่งส่วนข้อมูลโดยการตั้งค่า tf.data.experimental.AutoShardPolicy
ของ tf.data.experimental.DistributeOptions
หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับการชา ร์ดอัตโนมัติ โปรดดู คู่มือการป้อนข้อมูลแบบกระจาย
ต่อไปนี้คือตัวอย่างโดยย่อของวิธีปิดการแบ่งกลุ่มอัตโนมัติ เพื่อให้แต่ละแบบจำลองประมวลผลทุกตัวอย่าง ( ไม่แนะนำ ):
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
global_batch_size = 64
multi_worker_dataset = mnist_setup.mnist_dataset(batch_size=64)
dataset_no_auto_shard = multi_worker_dataset.with_options(options)
การประเมิน
หากคุณส่งผ่าน validation_data
ไปที่ Model.fit
มันจะสลับกันระหว่างการฝึกอบรมและการประเมินสำหรับแต่ละยุค การประเมินที่ใช้ validation_data
จะถูกกระจายไปยังผู้ปฏิบัติงานชุดเดียวกัน และผลการประเมินจะถูกรวบรวมและพร้อมใช้งานสำหรับผู้ปฏิบัติงานทั้งหมด
เช่นเดียวกับการฝึก ชุดข้อมูลการตรวจสอบจะถูกแบ่งโดยอัตโนมัติที่ระดับไฟล์ คุณต้องตั้งค่าขนาดแบตช์ส่วนกลางในชุดข้อมูลการตรวจสอบและตั้งค่า validation_steps
ขอแนะนำให้ใช้ชุดข้อมูลซ้ำสำหรับการประเมิน
อีกวิธีหนึ่ง คุณยังสามารถสร้างงานอื่นที่อ่านจุดตรวจสอบเป็นระยะและดำเนินการประเมินได้ นี่คือสิ่งที่ Estimator ทำ แต่นี่ไม่ใช่วิธีที่แนะนำในการประเมิน ดังนั้นจึงละเว้นรายละเอียด
ประสิทธิภาพ
ตอนนี้คุณมีโมเดล Keras ที่ตั้งค่าให้ทำงานในผู้ปฏิบัติงานหลายคนด้วย MultiWorkerMirroredStrategy
ในการปรับแต่งประสิทธิภาพของการฝึกอบรมผู้ปฏิบัติงานหลายคน คุณสามารถลองทำสิ่งต่อไปนี้:
tf.distribute.MultiWorkerMirroredStrategy
จัดเตรียม การใช้งานการสื่อสารแบบรวม หลายรายการ:-
RING
ใช้กลุ่มตามวงแหวนโดยใช้ gRPC เป็นเลเยอร์การสื่อสารข้ามโฮสต์ -
NCCL
ใช้ NVIDIA Collective Communication Library เพื่อใช้งานส่วนรวม -
AUTO
เลื่อนตัวเลือกไปที่รันไทม์
ทางเลือกที่ดีที่สุดของการใช้งานแบบรวมขึ้นอยู่กับจำนวนของ GPU ประเภทของ GPU และการเชื่อมต่อเครือข่ายในคลัสเตอร์ หากต้องการลบล้างตัวเลือกอัตโนมัติ ให้ระบุพารามิเตอร์
communication_options
ของ Constructor ของMultiWorkerMirroredStrategy
ตัวอย่างเช่น:communication_options=tf.distribute.experimental.CommunicationOptions(implementation=tf.distribute.experimental.CollectiveCommunication.NCCL)
-
ส่งตัวแปรไปที่
tf.float
ถ้าเป็นไปได้:- โมเดล ResNet อย่างเป็นทางการมี ตัวอย่าง วิธีการทำสิ่งนี้
ความทนทานต่อความผิดพลาด
ในการฝึกอบรมแบบซิงโครนัส คลัสเตอร์จะล้มเหลวหากพนักงานคนใดคนหนึ่งล้มเหลวและไม่มีกลไกการกู้คืนความล้มเหลวอยู่
การใช้ Keras กับ tf.distribute.Strategy
มาพร้อมกับข้อดีของความทนทานต่อข้อผิดพลาดในกรณีที่คนงานเสียชีวิตหรือไม่เสถียร คุณสามารถทำได้โดยคงสถานะการฝึกไว้ในระบบไฟล์แบบกระจายที่คุณเลือก เช่น เมื่อรีสตาร์ทอินสแตนซ์ที่ล้มเหลวหรือจองไว้ก่อนหน้านี้ สถานะการฝึกจะถูกกู้คืน
เมื่อผู้ปฏิบัติงานไม่พร้อมใช้งาน ผู้ปฏิบัติงานอื่นจะล้มเหลว (อาจหลังจากหมดเวลา) ในกรณีดังกล่าว ผู้ปฏิบัติงานที่ไม่พร้อมใช้งานจำเป็นต้องเริ่มต้นใหม่ เช่นเดียวกับผู้ปฏิบัติงานอื่นที่ล้มเหลว
ModelCheckpoint โทรกลับ
การเรียกกลับ ModelCheckpoint
ไม่มีฟังก์ชันความทนทานต่อข้อผิดพลาดอีกต่อไป โปรดใช้การเรียกกลับ BackupAndRestore
แทน
ยังสามารถใช้การเรียกกลับ ModelCheckpoint
เพื่อบันทึกจุดตรวจได้ แต่ด้วยสิ่งนี้ หากการฝึกอบรมถูกขัดจังหวะหรือเสร็จสิ้น เพื่อที่จะดำเนินการฝึกอบรมต่อจากจุดตรวจ ผู้ใช้มีหน้าที่โหลดแบบจำลองด้วยตนเอง
ผู้ใช้สามารถเลือกที่จะบันทึกและกู้คืนรุ่น/น้ำหนักภายนอกการเรียกกลับของ ModelCheckpoint
การบันทึกและการโหลดโมเดล
ในการบันทึกแบบจำลองของคุณโดยใช้ model.save
หรือ tf.saved_model.save
ปลายทางการบันทึกจะต้องแตกต่างกันสำหรับผู้ปฏิบัติงานแต่ละคน
- สำหรับผู้ปฏิบัติงานที่ไม่ใช่หัวหน้างาน คุณจะต้องบันทึกโมเดลลงในไดเร็กทอรีชั่วคราว
- สำหรับหัวหน้า คุณจะต้องบันทึกลงในไดเร็กทอรีโมเดลที่ให้มา
ไดเร็กทอรีชั่วคราวของผู้ปฏิบัติงานต้องไม่ซ้ำกันเพื่อป้องกันข้อผิดพลาดที่เกิดจากผู้ปฏิบัติงานหลายคนพยายามเขียนไปยังตำแหน่งเดียวกัน
แบบจำลองที่บันทึกไว้ในไดเร็กทอรีทั้งหมดนั้นเหมือนกัน และโดยทั่วไปควรอ้างอิงเฉพาะแบบจำลองที่บันทึกไว้โดยหัวหน้าเท่านั้นเพื่อการกู้คืนหรือให้บริการ
คุณควรมีตรรกะการล้างข้อมูลที่จะลบไดเร็กทอรีชั่วคราวที่สร้างโดยผู้ปฏิบัติงานเมื่อการฝึกอบรมของคุณเสร็จสิ้น
เหตุผลในการประหยัดค่าใช้จ่ายสำหรับหัวหน้าและพนักงานก็เพราะว่าคุณอาจกำลังรวมตัวแปรระหว่างจุดตรวจ ซึ่งต้องใช้ทั้งหัวหน้าและพนักงานเพื่อเข้าร่วมในโปรโตคอลการสื่อสาร allreduce ในทางกลับกัน การปล่อยให้หัวหน้าและพนักงานบันทึกลงในไดเร็กทอรีรุ่นเดียวกันจะส่งผลให้เกิดข้อผิดพลาดเนื่องจากการโต้แย้ง
การใช้ MultiWorkerMirroredStrategy
โปรแกรมจะทำงานกับผู้ปฏิบัติงานทุกคน และเพื่อที่จะทราบว่าผู้ปฏิบัติงานปัจจุบันเป็นหัวหน้าหรือไม่ โปรแกรมจะใช้ประโยชน์จากอ็อบเจ็กต์ตัวแก้ไขคลัสเตอร์ที่มีแอตทริบิวต์ task_type
และ task_id
:
-
task_type
จะบอกคุณว่างานปัจจุบันคืออะไร (เช่น'worker'
) -
task_id
จะบอกให้คุณทราบถึงตัวระบุของผู้ปฏิบัติงาน - ผู้ปฏิบัติงานที่มี
task_id == 0
ถูกกำหนดให้เป็นหัวหน้าคนงาน
ในข้อมูลโค้ดด้านล่าง ฟังก์ชัน write_filepath
จัดเตรียมพาธของไฟล์ที่จะเขียน ซึ่งขึ้นอยู่กับ task_id
ของผู้ปฏิบัติงาน:
- สำหรับหัวหน้าคนงาน (ด้วย
task_id == 0
) จะเขียนไปยังพาธไฟล์ดั้งเดิม - สำหรับผู้ปฏิบัติงานอื่นๆ จะสร้างไดเร็กทอรีชั่วคราว—
temp_dir
— โดยมีtask_id
ในพาธไดเร็กทอรีที่จะเขียนใน:
model_path = '/tmp/keras-model'
def _is_chief(task_type, task_id):
# Note: there are two possible `TF_CONFIG` configuration.
# 1) In addition to `worker` tasks, a `chief` task type is use;
# in this case, this function should be modified to
# `return task_type == 'chief'`.
# 2) Only `worker` task type is used; in this case, worker 0 is
# regarded as the chief. The implementation demonstrated here
# is for this case.
# For the purpose of this Colab section, the `task_type is None` case
# is added because it is effectively run with only a single worker.
return (task_type == 'worker' and task_id == 0) or task_type is None
def _get_temp_dir(dirpath, task_id):
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
write_model_path = write_filepath(model_path, task_type, task_id)
เท่านี้คุณก็พร้อมที่จะบันทึกแล้ว:
multi_worker_model.save(write_model_path)
2022-02-05 02:21:31.809502: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/keras-model/assets INFO:tensorflow:Assets written to: /tmp/keras-model/assetsตัวยึดตำแหน่ง41
ตามที่อธิบายไว้ข้างต้น ในภายหลังควรโหลดโมเดลจากเส้นทางที่หัวหน้าบันทึกไว้เท่านั้น ดังนั้นเรามาลบโมเดลชั่วคราวที่คนงานที่ไม่ใช่หัวหน้าบันทึกไว้:
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(os.path.dirname(write_model_path))
ตอนนี้ เมื่อถึงเวลาโหลด ลองใช้ tf.keras.models.load_model
API ที่สะดวก และทำงานต่อไป
ในที่นี้ สมมติโดยใช้ผู้ปฏิบัติงานคนเดียวในการโหลดและดำเนินการฝึกอบรมต่อ ซึ่งในกรณีนี้ คุณจะไม่เรียก tf.keras.models.load_model
ภายใน strategy.scope()
อื่น (โปรดทราบว่า strategy = tf.distribute.MultiWorkerMirroredStrategy()
ตามที่กำหนดไว้ก่อนหน้านี้ ):
loaded_model = tf.keras.models.load_model(model_path)
# Now that the model is restored, and can continue with the training.
loaded_model.fit(single_worker_dataset, epochs=2, steps_per_epoch=20)
Epoch 1/2 20/20 [==============================] - 1s 12ms/step - loss: 2.2949 - accuracy: 0.0492 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2680 - accuracy: 0.0773 <keras.callbacks.History at 0x7f6669989750>
การบันทึกและฟื้นฟูจุดตรวจ
ในทางกลับกัน จุดตรวจสอบช่วยให้คุณสามารถบันทึกน้ำหนักของแบบจำลองของคุณและเรียกคืนได้โดยไม่ต้องบันทึกทั้งแบบจำลอง
ที่นี่ คุณจะต้องสร้าง tf.train.Checkpoint
ที่ติดตามโมเดล ซึ่งจัดการโดย tf.train.CheckpointManager
เพื่อให้คงไว้เฉพาะจุดตรวจล่าสุดเท่านั้น:
checkpoint_dir = '/tmp/ckpt'
checkpoint = tf.train.Checkpoint(model=multi_worker_model)
write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id)
checkpoint_manager = tf.train.CheckpointManager(
checkpoint, directory=write_checkpoint_dir, max_to_keep=1)
เมื่อตั้งค่า CheckpointManager
แล้ว คุณก็พร้อมที่จะบันทึกและลบจุดตรวจที่ผู้ปฏิบัติงานที่ไม่ใช่หัวหน้าบันทึกไว้:
checkpoint_manager.save()
if not _is_chief(task_type, task_id):
tf.io.gfile.rmtree(write_checkpoint_dir)
ตอนนี้ เมื่อคุณต้องการคืนค่าโมเดล คุณสามารถค้นหาจุดตรวจล่าสุดที่บันทึกไว้โดยใช้ฟังก์ชัน tf.train.latest_checkpoint
ที่สะดวก หลังจากฟื้นฟูจุดตรวจแล้วคุณสามารถฝึกต่อได้
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)
multi_worker_model.fit(multi_worker_dataset, epochs=2, steps_per_epoch=20)
2022-02-05 02:21:33.584421: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/2 2022-02-05 02:21:33.803317: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. 20/20 [==============================] - 3s 13ms/step - loss: 2.2970 - accuracy: 0.0547 Epoch 2/2 20/20 [==============================] - 0s 13ms/step - loss: 2.2690 - accuracy: 0.0938 <keras.callbacks.History at 0x7f6669589850>
BackupAndRestore โทรกลับ
การเรียกกลับ tf.keras.callbacks.BackupAndRestore
จัดเตรียมฟังก์ชันความทนทานต่อข้อผิดพลาด โดยการสำรองข้อมูลรุ่นและหมายเลขยุคปัจจุบันในไฟล์จุดตรวจสอบชั่วคราวภายใต้อาร์กิวเมนต์ backup_dir
ไปยัง BackupAndRestore
สิ่งนี้จะทำในตอนท้ายของแต่ละยุค
เมื่องานถูกขัดจังหวะและเริ่มต้นใหม่ การเรียกกลับจะคืนค่าจุดตรวจสุดท้าย และการฝึกอบรมจะดำเนินต่อไปตั้งแต่เริ่มต้นยุคที่ถูกขัดจังหวะ การฝึกบางส่วนที่ทำไปแล้วในยุคที่ยังไม่เสร็จก่อนการหยุดชะงักจะถูกโยนทิ้งไป เพื่อไม่ให้กระทบต่อสถานะของโมเดลขั้นสุดท้าย
หากต้องการใช้งาน ให้ระบุอินสแตนซ์ของ tf.keras.callbacks.BackupAndRestore
ที่การโทร Model.fit
ด้วย MultiWorkerMirroredStrategy
หากผู้ปฏิบัติงานถูกขัดจังหวะ ทั้งคลัสเตอร์จะหยุดชั่วคราวจนกว่าผู้ปฏิบัติงานที่ถูกขัดจังหวะจะเริ่มต้นใหม่ ผู้ปฏิบัติงานคนอื่นๆ จะรีสตาร์ทด้วย และผู้ปฏิบัติงานที่ถูกขัดจังหวะจะเข้าร่วมคลัสเตอร์อีกครั้ง จากนั้น พนักงานทุกคนจะอ่านไฟล์จุดตรวจสอบที่บันทึกไว้ก่อนหน้านี้และเลือกสถานะเดิม ซึ่งจะทำให้คลัสเตอร์กลับมาซิงค์กันได้ จากนั้นการฝึกอบรมจะดำเนินต่อไป
การเรียกกลับของ BackupAndRestore
ใช้ CheckpointManager
เพื่อบันทึกและกู้คืนสถานะการฝึก ซึ่งจะสร้างไฟล์ที่เรียกว่าจุดตรวจสอบ ซึ่งติดตามจุดตรวจสอบที่มีอยู่พร้อมกับจุดตรวจสอบล่าสุด ด้วยเหตุผลนี้ ไม่ควรใช้ backup_dir
ซ้ำเพื่อจัดเก็บจุดตรวจอื่นๆ เพื่อหลีกเลี่ยงความขัดแย้งของชื่อ
ในปัจจุบัน การเรียกกลับของ BackupAndRestore
รองรับการฝึกอบรมพนักงานคนเดียวโดยไม่มีกลยุทธ์— MirroredStrategy
— และการฝึกอบรมหลายคนด้วย MultiWorkerMirroredStrategy
ด้านล่างนี้คือตัวอย่างสองตัวอย่างสำหรับการฝึกอบรมทั้งแบบหลายคนและคนเดียว:
# Multi-worker training with `MultiWorkerMirroredStrategy`
# and the `BackupAndRestore` callback.
callbacks = [tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')]
with strategy.scope():
multi_worker_model = mnist_setup.build_and_compile_cnn_model()
multi_worker_model.fit(multi_worker_dataset,
epochs=3,
steps_per_epoch=70,
callbacks=callbacks)
2022-02-05 02:21:37.063622: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2" op: "TensorSliceDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_INT64 } } } attr { key: "_cardinality" value { i: 60000 } } attr { key: "is_files" value { b: false } } attr { key: "metadata" value { s: "\n\024TensorSliceDataset:5" } } attr { key: "output_shapes" value { list { shape { dim { size: 28 } dim { size: 28 } } shape { } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } Epoch 1/3 70/70 [==============================] - 3s 13ms/step - loss: 2.2667 - accuracy: 0.2123 Epoch 2/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1925 - accuracy: 0.4509 Epoch 3/3 70/70 [==============================] - 1s 13ms/step - loss: 2.1057 - accuracy: 0.5614 <keras.callbacks.History at 0x7f6669555d90>
หากคุณตรวจสอบไดเร็กทอรีของ backup_dir
ที่คุณระบุไว้ใน BackupAndRestore
คุณอาจสังเกตเห็นไฟล์จุดตรวจสอบที่สร้างขึ้นชั่วคราว ไฟล์เหล่านี้จำเป็นสำหรับการกู้คืนอินสแตนซ์ที่หายไปก่อนหน้านี้ และไลบรารีจะถูกลบออกเมื่อสิ้นสุด Model.fit
เมื่อคุณออกจากการฝึกสำเร็จ
แหล่งข้อมูลเพิ่มเติม
- คู่มือ การฝึกอบรมแบบกระจายใน TensorFlow จะให้ภาพรวมของกลยุทธ์การจัดจำหน่ายที่มีอยู่
- ลูปการฝึกแบบกำหนดเองด้วย Keras และ บทช่วยสอน MultiWorkerMirroredStrategy จะแสดงวิธีใช้
MultiWorkerMirroredStrategy
กับ Keras และลูปการฝึกแบบกำหนดเอง - ตรวจสอบ รุ่นอย่างเป็นทางการ ซึ่งหลายรุ่น สามารถกำหนดค่าให้เรียกใช้กลยุทธ์การจัดจำหน่ายได้หลายแบบ
- คู่มือประสิทธิภาพที่ ดีขึ้นด้วย tf.function ให้ข้อมูลเกี่ยวกับกลยุทธ์และเครื่องมืออื่นๆ เช่น TensorFlow Profiler ที่คุณสามารถใช้เพื่อเพิ่มประสิทธิภาพการทำงานของโมเดล TensorFlow ของคุณ