ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ภาพรวม
การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ เป็นวิธีคู่ขนานข้อมูลทั่วไปในการขยายขนาดการฝึกโมเดลบนเครื่องหลายเครื่อง
คลัสเตอร์การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ประกอบด้วย ผู้ปฏิบัติงาน และ เซิร์ฟเวอร์พารามิเตอร์ ตัวแปรถูกสร้างขึ้นบนเซิร์ฟเวอร์พารามิเตอร์ และจะถูกอ่านและอัปเดตโดยผู้ปฏิบัติงานในแต่ละขั้นตอน โดยค่าเริ่มต้น ผู้ปฏิบัติงานจะอ่านและอัปเดตตัวแปรเหล่านี้อย่างอิสระโดยไม่ทำข้อมูลให้ตรงกัน นี่คือเหตุผลที่บางครั้งการฝึกแบบพารามิเตอร์เซิร์ฟเวอร์เรียกว่า การฝึกแบบอะซิงโครนัส
ใน TensorFlow 2 การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ขับเคลื่อนโดยคลาส tf.distribute.experimental.ParameterServerStrategy
ซึ่งกระจายขั้นตอนการฝึกอบรมไปยังคลัสเตอร์ที่ขยายได้ถึงหลายพันคน (พร้อมกับเซิร์ฟเวอร์พารามิเตอร์)
วิธีการฝึกอบรมที่รองรับ
มีสองวิธีการฝึกอบรมที่ได้รับการสนับสนุนหลัก:
- Keras
Model.fit
API ซึ่งแนะนำเมื่อคุณต้องการนามธรรมระดับสูงและการจัดการการฝึกอบรม - ลูปการฝึกแบบกำหนดเอง (คุณสามารถอ้างอิงถึง การ ฝึกแบบกำหนดเอง การ เขียนลูปการฝึกตั้งแต่เริ่มต้น และ ลูปการฝึกแบบกำหนดเองด้วย Keras และ MultiWorkerMirroredStrategy สำหรับรายละเอียดเพิ่มเติม) ขอแนะนำการฝึกวนรอบแบบกำหนดเองเมื่อคุณต้องการกำหนดรายละเอียดของลูปการฝึกของพวกเขา
คลัสเตอร์ที่มีงานและงาน
โดยไม่คำนึงถึง API ที่เลือก ( Model.fit
หรือลูปการฝึกอบรมแบบกำหนดเอง) การฝึกอบรมแบบกระจายใน TensorFlow 2 เกี่ยวข้องกับ: 'cluster'
ที่มี 'jobs'
หลายรายการ และแต่ละงานอาจมี 'tasks'
อย่างน้อยหนึ่งรายการ
เมื่อใช้การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ ขอแนะนำให้มี:
- หนึ่ง งานผู้ประสาน งาน (ซึ่งมีชื่องาน
chief
) - งานของผู้ปฏิบัติงานหลาย คน (ชื่อ
worker
); และ - งาน เซิร์ฟเวอร์พารามิเตอร์ หลายรายการ (ชื่องาน
ps
)
ในขณะที่ผู้ ประสานงาน สร้างทรัพยากร ส่งงานฝึกอบรม เขียนจุดตรวจสอบ และจัดการกับความล้มเหลวของ งาน ผู้ปฏิบัติงาน และ เซิร์ฟเวอร์พารามิเตอร์ จะเรียกใช้ tf.distribute.Server
ที่รับฟังคำขอจากผู้ประสานงาน
การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ด้วย Model.fit
API
การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ด้วย Model.fit
API ต้องการให้ผู้ประสานงานใช้อ็อบเจ็กต์ tf.distribute.experimental.ParameterServerStrategy
และ tf.keras.utils.experimental.DatasetCreator
เป็นอินพุต คล้ายกับการใช้งาน Model.fit
โดยไม่มีกลยุทธ์หรือกับกลยุทธ์อื่นๆ เวิร์กโฟลว์เกี่ยวข้องกับการสร้างและรวบรวมโมเดล การเตรียมการเรียกกลับ ตามด้วยการเรียก Model.fit
การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์พร้อมลูปการฝึกแบบกำหนดเอง
ด้วยลูปการฝึกแบบกำหนดเอง คลาส tf.distribute.experimental.coordinator.ClusterCoordinator
เป็นองค์ประกอบหลักที่ใช้สำหรับผู้ประสานงาน
- คลาส
ClusterCoordinator
ต้องทำงานร่วมกับอ็อบเจ็กต์tf.distribute.Strategy
- ออบเจ็กต์
tf.distribute.Strategy
นี้จำเป็นสำหรับการจัดเตรียมข้อมูลของคลัสเตอร์ และใช้เพื่อกำหนดขั้นตอนการฝึก ดังที่แสดงไว้ใน การฝึกแบบกำหนดเองด้วย tf.distribute.Strategy - วัตถุ
ClusterCoordinator
จะส่งการดำเนินการของขั้นตอนการฝึกอบรมเหล่านี้ไปยังผู้ปฏิบัติงานระยะไกล - สำหรับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์
ClusterCoordinator
จำเป็นต้องทำงานกับtf.distribute.experimental.ParameterServerStrategy
API ที่สำคัญที่สุดที่จัดเตรียมโดยอ็อบเจ็กต์ ClusterCoordinator
คือ schedule
:
- API
schedule
คิวtf.function
และส่งคืนRemoteValue
ที่เหมือนอนาคตทันที - ฟังก์ชันที่อยู่ในคิวจะถูกส่งไปยังผู้ปฏิบัติงานระยะไกลในเธรดพื้นหลัง และ
RemoteValue
ของพวกเขาจะถูกเติมแบบอะซิงโครนัส - เนื่องจาก
schedule
ไม่ต้องการการมอบหมายผู้ปฏิบัติงานtf.function
ที่ส่งผ่านจึงสามารถดำเนินการกับผู้ปฏิบัติงานที่มีอยู่ได้ - หากผู้ปฏิบัติงานที่ดำเนินการอยู่ไม่พร้อมใช้งานก่อนที่จะเสร็จสมบูรณ์ ฟังก์ชันจะถูกลองอีกครั้งกับผู้ปฏิบัติงานอื่นที่พร้อมใช้งาน
- เนื่องจากข้อเท็จจริงนี้และการเรียกใช้ฟังก์ชันไม่ใช่อะตอมมิก ฟังก์ชันอาจถูกเรียกใช้งานมากกว่าหนึ่งครั้ง
นอกจากการส่งฟังก์ชันระยะไกลแล้ว ClusterCoordinator
ยังช่วยสร้างชุดข้อมูลบนผู้ปฏิบัติงานทั้งหมด และสร้างชุดข้อมูลเหล่านี้ใหม่เมื่อผู้ปฏิบัติงานกู้คืนจากความล้มเหลว
ตั้งค่าการสอน
บทช่วยสอนจะแยกสาขาออกเป็น Model.fit
และเส้นทางวนรอบการฝึกแบบกำหนดเอง และคุณสามารถเลือกเส้นทางที่เหมาะกับความต้องการของคุณได้ ส่วนอื่นที่ไม่ใช่ "การฝึกกับ X" ใช้ได้กับทั้งสองเส้นทาง
pip install portpicker
import multiprocessing
import os
import random
import portpicker
import tensorflow as tf
การตั้งค่าคลัสเตอร์
ดังที่กล่าวไว้ข้างต้น คลัสเตอร์การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ต้องการงานผู้ประสานงานที่รันโปรแกรมการฝึกอบรมของคุณ ผู้ปฏิบัติงานหนึ่งรายหรือหลายราย และงานเซิร์ฟเวอร์พารามิเตอร์ที่เรียกใช้เซิร์ฟเวอร์ tf.distribute.Server
— และอาจเป็นงานการประเมินเพิ่มเติมที่เรียกใช้การประเมินแบบ side-car (ดูส่วนการประเมินรถด้านข้างด้านล่าง) ข้อกำหนดในการตั้งค่าคือ:
- งานผู้ประสานงานจำเป็นต้องทราบที่อยู่และพอร์ตของเซิร์ฟเวอร์ TensorFlow อื่นๆ ทั้งหมด ยกเว้นผู้ประเมิน
- ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์จำเป็นต้องรู้ว่าพอร์ตใดที่พวกเขาต้องรับฟัง เพื่อความง่าย คุณสามารถส่งข้อมูลคลัสเตอร์ที่สมบูรณ์เมื่อสร้างเซิร์ฟเวอร์ TensorFlow ในงานเหล่านี้
- งานผู้ประเมินไม่จำเป็นต้องรู้การตั้งค่าของคลัสเตอร์การฝึกอบรม หากเป็นเช่นนั้น ก็ไม่ควรพยายามเชื่อมต่อกับคลัสเตอร์การฝึก
- ผู้ปฏิบัติงานและเซิร์ฟเวอร์พารามิเตอร์ควรมีประเภทงานเป็น
"worker"
และ"ps"
ตามลำดับ ผู้ประสานงานควรใช้"chief"
เป็นประเภทงานด้วยเหตุผลเดิม
ในบทช่วยสอนนี้ คุณจะสร้างคลัสเตอร์ระหว่างดำเนินการเพื่อให้สามารถเรียกใช้การฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ทั้งหมดใน Colab คุณจะได้เรียนรู้วิธีตั้งค่า คลัสเตอร์จริง ในส่วนภายหลัง
คลัสเตอร์ในกระบวนการ
คุณจะเริ่มต้นด้วยการสร้างเซิร์ฟเวอร์ TensorFlow หลายตัวล่วงหน้าและเชื่อมต่อในภายหลัง โปรดทราบว่านี่เป็นเพียงเพื่อการสาธิตของบทช่วยสอนนี้เท่านั้น และในการฝึกอบรมจริง เซิร์ฟเวอร์จะเริ่มต้นบนเครื่อง "worker"
และ "ps"
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict["worker"] = ["localhost:%s" % port for port in worker_ports]
if num_ps > 0:
cluster_dict["ps"] = ["localhost:%s" % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name="worker",
task_index=i,
config=worker_config,
protocol="grpc")
for i in range(num_ps):
tf.distribute.Server(
cluster_spec,
job_name="ps",
task_index=i,
protocol="grpc")
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer="grpc")
return cluster_resolver
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
NUM_WORKERS = 3
NUM_PS = 2
cluster_resolver = create_in_process_cluster(NUM_WORKERS, NUM_PS)
การตั้งค่าคลัสเตอร์ในกระบวนการมักใช้ในการทดสอบหน่วย เช่น ที่นี่
อีกทางเลือกหนึ่งสำหรับการทดสอบในเครื่องคือการเริ่มกระบวนการบนเครื่องในเครื่อง ดูตัวอย่างของแนวทางนี้ใน การฝึกอบรม Multi-worker กับ Keras
สร้างอินสแตนซ์ ParameterServerStrategy
ก่อนที่คุณจะดำดิ่งลงไปในโค้ดการฝึก เรามาสร้างอินสแตนซ์ของอ็อบเจกต์ ParameterServerStrategy
กันเสียก่อน โปรดทราบว่าสิ่งนี้จำเป็น ไม่ว่าคุณจะใช้ Model.fit
หรือ Training Loop แบบกำหนดเองก็ตาม อาร์กิวเมนต์ variable_partitioner
จะอธิบายไว้ในส่วนการแบ่งกลุ่มย่อยของ ตัวแปร
variable_partitioner = (
tf.distribute.experimental.partitioners.MinSizePartitioner(
min_shard_bytes=(256 << 10),
max_shards=NUM_PS))
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver,
variable_partitioner=variable_partitioner)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'ps': ['localhost:19069', 'localhost:19342'], 'worker': ['localhost:18007', 'localhost:20252', 'localhost:23613']}) INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0' INFO:tensorflow:Number of GPUs on workers: 1
หากต้องการใช้ GPU ในการฝึกอบรม ให้จัดสรร GPU ที่พนักงานแต่ละคนมองเห็นได้ ParameterServerStrategy
จะใช้ GPU ที่มีอยู่ทั้งหมดสำหรับผู้ปฏิบัติงานแต่ละคน โดยมีข้อจำกัดว่าพนักงานทุกคนควรมี GPU จำนวนเท่ากัน
การแบ่งส่วนตัวแปร
การแบ่งส่วนตัวแปรหมายถึงการแบ่งตัวแปรออกเป็นตัวแปรย่อยๆ หลายๆ ตัว ซึ่งเรียกว่า ชา ร์ด การแบ่งกลุ่มย่อยแบบแปรผันอาจมีประโยชน์ในการกระจายโหลดของเครือข่ายเมื่อเข้าถึงชาร์ดเหล่านี้ นอกจากนี้ยังเป็นประโยชน์ในการกระจายการคำนวณและการจัดเก็บตัวแปรปกติในเซิร์ฟเวอร์พารามิเตอร์หลายตัว
ในการเปิดใช้งานการแบ่งกลุ่มย่อยตัวแปร คุณสามารถส่งผ่าน variable_partitioner
เมื่อสร้างอ็อบเจ็กต์ ParameterServerStrategy
variable_partitioner
จะถูกเรียกใช้ทุกครั้งที่สร้างตัวแปร และคาดว่าจะส่งคืนจำนวนส่วนแบ่งข้อมูลตามแต่ละส่วนของตัวแปร variable_partitioner
บางตัวที่พร้อมใช้งาน เช่น tf.distribute.experimental.partitioners.MinSizePartitioner
ขอแนะนำให้ใช้ตัวแบ่งพาร์ติชันตามขนาด เช่น tf.distribute.experimental.partitioners.MinSizePartitioner
เพื่อหลีกเลี่ยงการแบ่งพาร์ติชันตัวแปรขนาดเล็ก ซึ่งอาจส่งผลเสียต่อความเร็วในการฝึกโมเดล
เมื่อ variable_partitioner
ถูกส่งผ่านไป และหากคุณสร้างตัวแปรโดยตรงภายใต้ strategy.scope()
ตัวแปรนั้นจะกลายเป็นประเภทคอนเทนเนอร์ที่มีคุณสมบัติ variables
ซึ่งให้การเข้าถึงรายการชาร์ด ในกรณีส่วนใหญ่ คอนเทนเนอร์นี้จะถูกแปลงเป็นเทนเซอร์โดยอัตโนมัติโดยการต่อชาร์ดทั้งหมดเข้าด้วยกัน เป็นผลให้สามารถใช้เป็นตัวแปรปกติได้ ในทางกลับกัน บางวิธีของ TensorFlow เช่น tf.nn.embedding_lookup
ให้การใช้งานที่มีประสิทธิภาพสำหรับคอนเทนเนอร์ประเภทนี้ และจะหลีกเลี่ยงการต่อกันอัตโนมัติในวิธีการเหล่านี้
โปรดดูเอกสาร API ของ tf.distribute.experimental.ParameterServerStrategy
สำหรับรายละเอียดเพิ่มเติม
เทรนนิ่งกับ Model.fit
Keras จัดเตรียม API การฝึกอบรมที่ใช้งานง่ายผ่าน Model.fit
ที่จัดการลูปการฝึกภายใต้ประทุน ด้วยความยืดหยุ่นของ train_step
ที่เขียนทับได้ และการเรียกกลับ ซึ่งมีฟังก์ชันต่างๆ เช่น การบันทึกจุดตรวจหรือการบันทึกสรุปสำหรับ TensorBoard ด้วย Model.fit
รหัสการฝึกอบรมเดียวกันสามารถใช้สำหรับกลยุทธ์อื่นด้วยการสลับวัตถุกลยุทธ์อย่างง่าย
ป้อนข้อมูล
Model.fit
ที่มีการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์กำหนดให้ป้อนข้อมูลใน callable ที่รับอาร์กิวเมนต์ประเภท tf.distribute.InputContext
และส่งคืน tf.data.Dataset
จากนั้น สร้างอ็อบเจ็กต์ tf.keras.utils.experimental.DatasetCreator
ที่รับ callable
ดังกล่าว และอ็อบเจ็กต์ tf.distribute.InputOptions
ทางเลือก ผ่านอาร์กิวเมนต์ input_options
โปรดทราบว่าขอแนะนำให้สับเปลี่ยนและทำซ้ำข้อมูลด้วยการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ และระบุ steps_per_epoch
ในการเรียก fit
เพื่อให้ไลบรารีทราบขอบเขตของยุค
โปรดดูบทแนะนำการ ป้อนข้อมูลแบบกระจาย สำหรับข้อมูลเพิ่มเติมเกี่ยวกับอาร์กิวเมนต์ InputContext
def dataset_fn(input_context):
global_batch_size = 64
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
x = tf.random.uniform((10, 10))
y = tf.random.uniform((10,))
dataset = tf.data.Dataset.from_tensor_slices((x, y)).shuffle(10).repeat()
dataset = dataset.shard(
input_context.num_input_pipelines,
input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2)
return dataset
dc = tf.keras.utils.experimental.DatasetCreator(dataset_fn)
รหัสใน dataset_fn
จะถูกเรียกใช้บนอุปกรณ์อินพุต ซึ่งมักจะเป็น CPU ในเครื่องของผู้ปฏิบัติงานแต่ละเครื่อง
การสร้างแบบจำลองและการรวบรวม
ตอนนี้ คุณจะสร้าง tf.keras.Model
— tf.keras.models.Sequential
model สำหรับจุดประสงค์ในการสาธิต—ตามด้วยการเรียก Model.compile
เพื่อรวมส่วนประกอบต่างๆ เช่น ตัวเพิ่มประสิทธิภาพ ตัวชี้วัด หรือพารามิเตอร์ เช่น steps_per_execution
:
with strategy.scope():
model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])
model.compile(tf.keras.optimizers.SGD(), loss='mse', steps_per_execution=10)
โทรกลับและการฝึกอบรม
ก่อนที่คุณจะเรียกใช้ model.fit
สำหรับการฝึกอบรมจริง มาเตรียมการเรียกกลับที่จำเป็นสำหรับงานทั่วไปก่อน เช่น:
-
ModelCheckpoint
: เพื่อบันทึกน้ำหนักของแบบจำลอง -
BackupAndRestore
: เพื่อให้แน่ใจว่ามีการสำรองข้อมูลความคืบหน้าของการฝึกอบรมโดยอัตโนมัติ และกู้คืนได้หากคลัสเตอร์ประสบกับความไม่พร้อมใช้งาน (เช่น การยกเลิกหรือการสั่งจอง) หรือ -
TensorBoard
: เพื่อบันทึกรายงานความคืบหน้าลงในไฟล์สรุป ซึ่งแสดงเป็นภาพในเครื่องมือ TensorBoard
working_dir = '/tmp/my_working_dir'
log_dir = os.path.join(working_dir, 'log')
ckpt_filepath = os.path.join(working_dir, 'ckpt')
backup_dir = os.path.join(working_dir, 'backup')
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir=log_dir),
tf.keras.callbacks.ModelCheckpoint(filepath=ckpt_filepath),
tf.keras.callbacks.BackupAndRestore(backup_dir=backup_dir),
]
model.fit(dc, epochs=5, steps_per_epoch=20, callbacks=callbacks)
Epoch 1/5 INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). 2022-01-26 05:32:01.399347: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 4s - loss: 0.5761 - 4s/epoch - 185ms/step Epoch 2/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.4423 - 561ms/epoch - 28ms/step Epoch 3/5 WARNING:tensorflow:5 out of the last 5 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f89783e7560> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets WARNING:tensorflow:6 out of the last 6 calls to <function MultiDeviceSaver.save.<locals>.tf_function_save at 0x7f897851f050> triggered tf.function retracing. Tracing is expensive and the excessive number of tracings could be due to (1) creating @tf.function repeatedly in a loop, (2) passing tensors with different shapes, (3) passing Python objects instead of tensors. For (1), please define your @tf.function outside of the loop. For (2), @tf.function has experimental_relax_shapes=True option that relaxes argument shapes that can avoid unnecessary retracing. For (3), please refer to https://www.tensorflow.org/guide/function#controlling_retracing and https://www.tensorflow.org/api_docs/python/tf/function for more details. 20/20 - 0s - loss: 0.3983 - 392ms/epoch - 20ms/step Epoch 4/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 1s - loss: 0.3592 - 507ms/epoch - 25ms/step Epoch 5/5 INFO:tensorflow:Assets written to: /tmp/my_working_dir/ckpt/assets 20/20 - 0s - loss: 0.2705 - 384ms/epoch - 19ms/step <keras.callbacks.History at 0x7f89984ca890>
การใช้งานโดยตรงกับ ClusterCoordinator
(ไม่บังคับ)
แม้ว่าคุณจะเลือกเส้นทางการฝึก Model.fit
คุณยังสามารถสร้างอินสแตนซ์อ็อบเจ็กต์ tf.distribute.experimental.coordinator.ClusterCoordinator
เพื่อจัดกำหนดการฟังก์ชันอื่นๆ ที่คุณต้องการให้ดำเนินการกับผู้ปฏิบัติงานได้ ดูส่วน การฝึกอบรมด้วยลูปการฝึกอบรมแบบกำหนดเอง สำหรับรายละเอียดเพิ่มเติมและตัวอย่าง
การฝึกด้วยลูปการฝึกแบบกำหนดเอง
การใช้ลูปการฝึกแบบกำหนดเองกับ tf.distribute.Strategy
ให้ความยืดหยุ่นอย่างมากในการกำหนดลูปการฝึก ด้วย ParameterServerStrategy
ที่กำหนดไว้ข้างต้น (เป็น strategy
) คุณจะใช้ tf.distribute.experimental.coordinator.ClusterCoordinator
เพื่อส่งการดำเนินการตามขั้นตอนการฝึกอบรมไปยังผู้ปฏิบัติงานระยะไกล
จากนั้น คุณจะสร้างแบบจำลอง กำหนดชุดข้อมูลและฟังก์ชันขั้นตอน ดังที่คุณได้ทำในลูปการฝึกกับ tf.distribute.Strategy
อื่น ๆ คุณสามารถดูรายละเอียดเพิ่มเติมได้ในการ ฝึกอบรมแบบกำหนดเองด้วยบทช่วยสอน tf.distribute.Strategy
เพื่อให้แน่ใจว่าการดึงข้อมูลชุดข้อมูลล่วงหน้ามีประสิทธิภาพ ให้ใช้ API การสร้างชุดข้อมูลแบบกระจายที่แนะนำที่กล่าวถึงในส่วน ขั้นตอนการฝึกอบรม Dispatch ไปยังผู้ปฏิบัติงานระยะไกล ด้านล่าง นอกจากนี้ อย่าลืมโทรหา Strategy.run
ภายใน worker_fn
เพื่อใช้ประโยชน์จาก GPU ที่จัดสรรให้กับพนักงานอย่างเต็มที่ ขั้นตอนที่เหลือจะเหมือนกันสำหรับการฝึกแบบมีหรือไม่มี GPU
มาสร้างส่วนประกอบเหล่านี้ในขั้นตอนต่อไปนี้:
ตั้งค่าข้อมูล
ขั้นแรก ให้เขียนฟังก์ชันที่สร้างชุดข้อมูลที่มีตรรกะการประมวลผลล่วงหน้าซึ่งใช้งานโดยเลเยอร์การประมวลผลล่วงหน้าของ Keras
คุณจะสร้างเลเยอร์เหล่านี้นอก dataset_fn
แต่ใช้การแปลงภายใน dataset_fn
เนื่องจากคุณจะรวม dataset_fn
ไว้ใน tf.function
ซึ่งไม่อนุญาตให้สร้างตัวแปรภายใน
feature_vocab = [
"avenger", "ironman", "batman", "hulk", "spiderman", "kingkong", "wonder_woman"
]
label_vocab = ["yes", "no"]
with strategy.scope():
feature_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=feature_vocab,
mask_token=None)
label_lookup_layer = tf.keras.layers.StringLookup(
vocabulary=label_vocab,
num_oov_indices=0,
mask_token=None)
raw_feature_input = tf.keras.layers.Input(
shape=(3,),
dtype=tf.string,
name="feature")
feature_id_input = feature_lookup_layer(raw_feature_input)
feature_preprocess_stage = tf.keras.Model(
{"features": raw_feature_input},
feature_id_input)
raw_label_input = tf.keras.layers.Input(
shape=(1,),
dtype=tf.string,
name="label")
label_id_input = label_lookup_layer(raw_label_input)
label_preprocess_stage = tf.keras.Model(
{"label": raw_label_input},
label_id_input)
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/numpy/core/numeric.py:2446: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison return bool(asarray(a1 == a2).all())
สร้างตัวอย่างของเล่นในชุดข้อมูล:
def feature_and_label_gen(num_examples=200):
examples = {"features": [], "label": []}
for _ in range(num_examples):
features = random.sample(feature_vocab, 3)
label = ["yes"] if "avenger" in features else ["no"]
examples["features"].append(features)
examples["label"].append(label)
return examples
examples = feature_and_label_gen()
จากนั้น สร้างชุดข้อมูลการฝึกอบรมที่ห่อหุ้ม dataset_fn
:
def dataset_fn(_):
raw_dataset = tf.data.Dataset.from_tensor_slices(examples)
train_dataset = raw_dataset.map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(200).batch(32).repeat()
return train_dataset
สร้างแบบจำลอง
ถัดไป สร้างแบบจำลองและวัตถุอื่นๆ อย่าลืมสร้างตัวแปรทั้งหมดภายใต้ strategy.scope
# These variables created under the `strategy.scope` will be placed on parameter
# servers in a round-robin fashion.
with strategy.scope():
# Create the model. The input needs to be compatible with Keras processing layers.
model_input = tf.keras.layers.Input(
shape=(3,), dtype=tf.int64, name="model_input")
emb_layer = tf.keras.layers.Embedding(
input_dim=len(feature_lookup_layer.get_vocabulary()), output_dim=16384)
emb_output = tf.reduce_mean(emb_layer(model_input), axis=1)
dense_output = tf.keras.layers.Dense(units=1, activation="sigmoid")(emb_output)
model = tf.keras.Model({"features": model_input}, dense_output)
optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.1)
accuracy = tf.keras.metrics.Accuracy()
มายืนยันว่าการใช้ FixedShardsPartitioner
แบ่งตัวแปรทั้งหมดออกเป็นสองส่วนย่อย และแต่ละส่วนถูกกำหนดให้กับเซิร์ฟเวอร์พารามิเตอร์ที่แตกต่างกัน:
assert len(emb_layer.weights) == 2
assert emb_layer.weights[0].shape == (4, 16384)
assert emb_layer.weights[1].shape == (4, 16384)
assert emb_layer.weights[0].device == "/job:ps/replica:0/task:0/device:CPU:0"
assert emb_layer.weights[1].device == "/job:ps/replica:0/task:1/device:CPU:0"
กำหนดขั้นตอนการฝึก
ประการที่สาม สร้างขั้นตอนการฝึกอบรมที่รวมไว้ใน tf.function
:
@tf.function
def step_fn(iterator):
def replica_fn(batch_data, labels):
with tf.GradientTape() as tape:
pred = model(batch_data, training=True)
per_example_loss = tf.keras.losses.BinaryCrossentropy(
reduction=tf.keras.losses.Reduction.NONE)(labels, pred)
loss = tf.nn.compute_average_loss(per_example_loss)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
accuracy.update_state(labels, actual_pred)
return loss
batch_data, labels = next(iterator)
losses = strategy.run(replica_fn, args=(batch_data, labels))
return strategy.reduce(tf.distribute.ReduceOp.SUM, losses, axis=None)
ในฟังก์ชันขั้นตอนการฝึกอบรมด้านบน การเรียก Strategy.run
และ Strategy.reduce
ใน step_fn
สามารถรองรับ GPU ได้หลายตัวต่อพนักงานหนึ่งคน หากผู้ปฏิบัติงานมีการจัดสรร GPU Strategy.run
จะแจกจ่ายชุดข้อมูลบนแบบจำลองหลายรายการ
ส่งขั้นตอนการฝึกอบรมให้กับพนักงานทางไกล
หลังจากที่การคำนวณทั้งหมดถูกกำหนดโดย ParameterServerStrategy
คุณจะใช้คลาส tf.distribute.experimental.coordinator.ClusterCoordinator
เพื่อสร้างทรัพยากรและแจกจ่ายขั้นตอนการฝึกอบรมให้กับผู้ปฏิบัติงานระยะไกล
ขั้นแรกให้สร้างวัตถุ ClusterCoordinator
และส่งผ่านในวัตถุกลยุทธ์:
coordinator = tf.distribute.experimental.coordinator.ClusterCoordinator(strategy)
จากนั้น สร้างชุดข้อมูลต่อผู้ปฏิบัติงานและตัววนซ้ำ ใน per_worker_dataset_fn
ด้านล่าง แนะนำให้ห่อ dataset_fn
ลงใน strategy.distribute_datasets_from_function
เพื่อให้การดึงข้อมูลล่วงหน้าไปยัง GPU ล่วงหน้าอย่างมีประสิทธิภาพได้อย่างราบรื่น
@tf.function
def per_worker_dataset_fn():
return strategy.distribute_datasets_from_function(dataset_fn)
per_worker_dataset = coordinator.create_per_worker_dataset(per_worker_dataset_fn)
per_worker_iterator = iter(per_worker_dataset)
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,).
ขั้นตอนสุดท้ายคือการกระจายการคำนวณไปยังผู้ปฏิบัติงานระยะไกลโดยใช้ ClusterCoordinator.schedule
:
- วิธีการ
schedule
เข้าคิวtf.function
และส่งคืนRemoteValue
ที่เหมือนอนาคตทันที ฟังก์ชันที่อยู่ในคิวจะถูกส่งไปยังผู้ปฏิบัติงานระยะไกลในเธรดพื้นหลัง และRemoteValue
จะถูกเติมแบบอะซิงโครนัส - วิธีการ
join
(ClusterCoordinator.join
) สามารถใช้เพื่อรอจนกว่าฟังก์ชันที่กำหนดเวลาไว้ทั้งหมดจะถูกดำเนินการ
num_epoches = 4
steps_per_epoch = 5
for i in range(num_epoches):
accuracy.reset_states()
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
# Wait at epoch boundaries.
coordinator.join()
print ("Finished epoch %d, accuracy is %f." % (i, accuracy.result().numpy()))
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',). Finished epoch 0, accuracy is 0.543750. Finished epoch 1, accuracy is 0.543750. Finished epoch 2, accuracy is 0.950000. Finished epoch 3, accuracy is 1.000000.
นี่คือวิธีที่คุณสามารถดึงผลลัพธ์ของ RemoteValue
:
loss = coordinator.schedule(step_fn, args=(per_worker_iterator,))
print ("Final loss is %f" % loss.fetch())
Final loss is 0.000000ตัวยึดตำแหน่ง23
อีกวิธีหนึ่ง คุณสามารถเริ่มขั้นตอนทั้งหมดและทำบางสิ่งในขณะที่รอให้เสร็จสิ้นได้:
for _ in range(total_steps):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
while not coordinator.done():
time.sleep(10)
# Do something like logging metrics or writing checkpoints.
สำหรับเวิร์กโฟลว์การฝึกอบรมและการให้บริการที่สมบูรณ์สำหรับตัวอย่างนี้ โปรดดู การทดสอบ นี้
ข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูล
ชุดข้อมูลในโค้ดด้านบนถูกสร้างขึ้นโดยใช้ ClusterCoordinator.create_per_worker_dataset
API) มันสร้างชุดข้อมูลหนึ่งชุดต่อผู้ปฏิบัติงานและส่งคืนวัตถุคอนเทนเนอร์ คุณสามารถเรียกใช้เมธอด iter
เพื่อสร้างตัววนซ้ำต่อคนได้ ตัววนซ้ำต่อผู้ปฏิบัติงานประกอบด้วยตัววนซ้ำหนึ่งตัวต่อผู้ปฏิบัติงาน และส่วนที่เกี่ยวข้องของผู้ปฏิบัติงานจะถูกแทนที่ในอาร์กิวเมนต์อินพุตของฟังก์ชันที่ส่งผ่านไปยังเมธอด ClusterCoordinator.schedule
ก่อนที่ฟังก์ชันจะดำเนินการกับผู้ปฏิบัติงานรายใดรายหนึ่ง
ในปัจจุบัน เมธอด ClusterCoordinator.schedule
ถือว่าผู้ปฏิบัติงานเทียบเท่า ดังนั้นจึงถือว่าชุดข้อมูลของผู้ปฏิบัติงานต่างกันเหมือนกัน ยกเว้นอาจถูกสับเปลี่ยนต่างกันหากมีการดำเนินการ Dataset.shuffle
ด้วยเหตุนี้ จึงขอแนะนำให้ใช้ชุดข้อมูลซ้ำโดยไม่มีกำหนด และคุณจัดกำหนดการขั้นตอนจำนวนจำกัด แทนที่จะอาศัย OutOfRangeError
จากชุดข้อมูล
หมายเหตุสำคัญอีกประการหนึ่งคือ ชุดข้อมูล tf.data
ไม่สนับสนุนการทำให้เป็นอนุกรมและดีซีเรียลไลซ์เซชั่นโดยปริยายข้ามขอบเขตงาน ดังนั้นจึงเป็นสิ่งสำคัญที่จะสร้างชุดข้อมูลทั้งหมดภายในฟังก์ชันที่ส่งผ่านไปยัง ClusterCoordinator.create_per_worker_dataset
การประเมิน
มีมากกว่าหนึ่งวิธีในการกำหนดและเรียกใช้วงจรการประเมินในการฝึกอบรมแบบกระจาย แต่ละคนมีข้อดีและข้อเสียของตัวเองตามที่อธิบายไว้ด้านล่าง แนะนำให้ใช้วิธีการประเมินแบบอินไลน์หากคุณไม่ต้องการ
การประเมินแบบอินไลน์
ในวิธีนี้ ผู้ประสานงานจะสลับกันระหว่างการฝึกอบรมและการประเมิน ดังนั้นจึงเรียกว่า การประเมินแบบอินไลน์
การประเมินแบบอินไลน์มีประโยชน์หลายประการ ตัวอย่างเช่น:
- สามารถรองรับแบบจำลองการประเมินขนาดใหญ่และชุดข้อมูลการประเมินที่งานเดียวไม่สามารถถือครองได้
- ผลการประเมินสามารถใช้ในการตัดสินใจสำหรับการฝึกอบรมในยุคต่อไปได้
มีสองวิธีในการดำเนินการประเมินแบบอินไลน์: การประเมินโดยตรงและการประเมินแบบกระจาย
- การประเมินโดยตรง : สำหรับแบบจำลองขนาดเล็กและชุดข้อมูลการประเมิน ผู้ประสานงานสามารถเรียกใช้การประเมินโดยตรงบนแบบจำลองแบบกระจายด้วยชุดข้อมูลการประเมินบนผู้ประสานงาน:
eval_dataset = tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).batch(8)
eval_accuracy = tf.keras.metrics.Accuracy()
for batch_data, labels in eval_dataset:
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). Evaluation accuracy: 1.000000
- การประเมินแบบกระจาย : สำหรับโมเดลขนาดใหญ่หรือชุดข้อมูลที่ไม่สามารถรันได้โดยตรงบนผู้ประสานงาน งานผู้ประสานงานสามารถแจกจ่ายงานการประเมินให้กับผู้ปฏิบัติงานผ่าน
ClusterCoordinator.schedule
/ClusterCoordinator.join
:
with strategy.scope():
# Define the eval metric on parameter servers.
eval_accuracy = tf.keras.metrics.Accuracy()
@tf.function
def eval_step(iterator):
def replica_fn(batch_data, labels):
pred = model(batch_data, training=False)
actual_pred = tf.cast(tf.greater(pred, 0.5), tf.int64)
eval_accuracy.update_state(labels, actual_pred)
batch_data, labels = next(iterator)
strategy.run(replica_fn, args=(batch_data, labels))
def eval_dataset_fn():
return tf.data.Dataset.from_tensor_slices(
feature_and_label_gen(num_examples=16)).map(
lambda x: (
{"features": feature_preprocess_stage(x["features"])},
label_preprocess_stage(x["label"])
)).shuffle(16).repeat().batch(8)
per_worker_eval_dataset = coordinator.create_per_worker_dataset(eval_dataset_fn)
per_worker_eval_iterator = iter(per_worker_eval_dataset)
eval_steps_per_epoch = 2
for _ in range(eval_steps_per_epoch):
coordinator.schedule(eval_step, args=(per_worker_eval_iterator,))
coordinator.join()
print ("Evaluation accuracy: %f" % eval_accuracy.result())
WARNING:tensorflow:Model was constructed with shape (None, 3) for input KerasTensor(type_spec=TensorSpec(shape=(None, 3), dtype=tf.string, name='feature'), name='feature', description="created by layer 'feature'"), but it was called on an input with incompatible shape (3,). WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources WARNING:tensorflow:1 GPUs are allocated per worker. Please use DistributedDataset by calling strategy.experimental_distribute_dataset or strategy.distribute_datasets_from_function to make best use of GPU resources Evaluation accuracy: 1.000000
การประเมินด้านข้างรถ
อีกวิธีหนึ่งเรียกว่า การประเมินรถด้านข้าง ซึ่งคุณสร้างงานผู้ประเมินเฉพาะที่อ่านจุดตรวจซ้ำๆ และดำเนินการประเมินบนจุดตรวจล่าสุด จะช่วยให้โปรแกรมการฝึกอบรมของคุณเสร็จสิ้นก่อนเวลาอันควร หากคุณไม่ต้องการเปลี่ยนรอบการฝึกตามผลการประเมิน อย่างไรก็ตาม จำเป็นต้องมีงานผู้ประเมินเพิ่มเติมและจุดตรวจเป็นระยะเพื่อกระตุ้นการประเมิน ต่อไปนี้เป็นวงจรการประเมินด้านข้างรถที่เป็นไปได้:
checkpoint_dir = ...
eval_model = ...
eval_data = ...
checkpoint = tf.train.Checkpoint(model=eval_model)
for latest_checkpoint in tf.train.checkpoints_iterator(
checkpoint_dir):
try:
checkpoint.restore(latest_checkpoint).expect_partial()
except (tf.errors.OpError,) as e:
# checkpoint may be deleted by training when it is about to read it.
continue
# Optionally add callbacks to write summaries.
eval_model.evaluate(eval_data)
# Evaluation finishes when it has evaluated the last epoch.
if latest_checkpoint.endswith('-{}'.format(train_epoches)):
break
กลุ่มในโลกแห่งความเป็นจริง
ในสภาพแวดล้อมการผลิตจริง คุณจะรันงานทั้งหมดในกระบวนการต่าง ๆ บนเครื่องที่แตกต่างกัน วิธีที่ง่ายที่สุดในการกำหนดค่าข้อมูลคลัสเตอร์ในแต่ละงานคือการตั้งค่าตัวแปรสภาพแวดล้อม "TF_CONFIG"
และใช้ tf.distribute.cluster_resolver.TFConfigClusterResolver
เพื่อแยกวิเคราะห์ "TF_CONFIG"
สำหรับคำอธิบายทั่วไปเกี่ยวกับตัวแปรสภาพแวดล้อม "TF_CONFIG"
โปรดดูคู่มือ การฝึกอบรมแบบกระจาย
หากคุณเริ่มการฝึกอบรมโดยใช้ Kubernetes หรือเทมเพลตการกำหนดค่าอื่นๆ มีแนวโน้มสูงที่เทมเพลตเหล่านี้จะตั้งค่า “TF_CONFIG"
ให้คุณแล้ว
ตั้งค่าตัวแปรสภาพแวดล้อม "TF_CONFIG"
สมมติว่าคุณมี 3 ผู้ปฏิบัติงานและ 2 เซิร์ฟเวอร์พารามิเตอร์ "TF_CONFIG"
ของผู้ปฏิบัติงาน 1 สามารถเป็น:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
"TF_CONFIG"
ของผู้ประเมินสามารถ:
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"evaluator": ["host7:port"]
},
"task": {"type": "evaluator", "index": 0}
})
ส่วน "cluster"
ในสตริง "TF_CONFIG"
ด้านบนสำหรับผู้ประเมินเป็นทางเลือก
หากคุณใช้เลขฐานสองเดียวกันสำหรับงานทั้งหมด
หากคุณต้องการรันงานเหล่านี้ทั้งหมดโดยใช้ไบนารีเดียว คุณจะต้องปล่อยให้โปรแกรมของคุณแยกสาขาออกเป็นบทบาทต่างๆ ในตอนเริ่มต้น:
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()
if cluster_resolver.task_type in ("worker", "ps"):
# Start a TensorFlow server and wait.
elif cluster_resolver.task_type == "evaluator":
# Run side-car evaluation
else:
# Run the coordinator.
รหัสต่อไปนี้เริ่มต้นเซิร์ฟเวอร์ TensorFlow และรอ:
# Set the environment variable to allow reporting worker and ps failure to the
# coordinator. This is a workaround and won't be necessary in the future.
os.environ["GRPC_FAIL_FAST"] = "use_caller"
server = tf.distribute.Server(
cluster_resolver.cluster_spec(),
job_name=cluster_resolver.task_type,
task_index=cluster_resolver.task_id,
protocol=cluster_resolver.rpc_layer or "grpc",
start=True)
server.join()
การจัดการความล้มเหลวของงาน
ความล้มเหลวของผู้ปฏิบัติงาน
tf.distribute.experimental.coordinator.ClusterCoordinator
หรือ Model.fit
ให้ความทนทานต่อข้อผิดพลาดในตัวสำหรับความล้มเหลวของผู้ปฏิบัติงาน เมื่อกู้คืนผู้ปฏิบัติงาน ฟังก์ชันชุดข้อมูลที่ให้ไว้ก่อนหน้านี้ (ทั้ง ClusterCoordinator.create_per_worker_dataset
สำหรับลูปการฝึกที่กำหนดเอง หรือ tf.keras.utils.experimental.DatasetCreator
สำหรับ Model.fit
) จะถูกเรียกใช้บนคนงานเพื่อสร้างชุดข้อมูลใหม่
เซิร์ฟเวอร์พารามิเตอร์หรือความล้มเหลวของผู้ประสานงาน
อย่างไรก็ตาม เมื่อผู้ประสานงานเห็นข้อผิดพลาดของเซิร์ฟเวอร์พารามิเตอร์ จะทำให้เกิด UnavailableError
หรือ AbortedError
ทันที คุณสามารถรีสตาร์ทผู้ประสานงานได้ในกรณีนี้ ผู้ประสานงานเองก็อาจใช้งานไม่ได้เช่นกัน ดังนั้นจึงแนะนำให้ใช้เครื่องมือบางอย่างเพื่อไม่ให้สูญเสียความคืบหน้าในการฝึกอบรม:
สำหรับ
Model.fit
คุณควรใช้การเรียกกลับBackupAndRestore
ซึ่งจัดการการบันทึกและกู้คืนความคืบหน้าโดยอัตโนมัติ ดูตัวอย่างการ โทรกลับและการฝึกอบรม ด้านบนสำหรับลูปการฝึกแบบกำหนดเอง คุณควรตรวจสอบตัวแปรโมเดลเป็นระยะๆ และโหลดตัวแปรโมเดลจากจุดตรวจสอบ หากมี ก่อนที่การฝึกจะเริ่มต้น ความคืบหน้าของการฝึกอบรมสามารถอนุมานได้โดยประมาณจาก
optimizer.iterations
การวนซ้ำ หากจุดตรวจสอบเครื่องมือเพิ่มประสิทธิภาพ:
checkpoint_manager = tf.train.CheckpointManager(
tf.train.Checkpoint(model=model, optimizer=optimizer),
checkpoint_dir,
max_to_keep=3)
if checkpoint_manager.latest_checkpoint:
checkpoint = checkpoint_manager.checkpoint
checkpoint.restore(
checkpoint_manager.latest_checkpoint).assert_existing_objects_matched()
global_steps = int(optimizer.iterations.numpy())
starting_epoch = global_steps // steps_per_epoch
for _ in range(starting_epoch, num_epoches):
for _ in range(steps_per_epoch):
coordinator.schedule(step_fn, args=(per_worker_iterator,))
coordinator.join()
checkpoint_manager.save()
กำลังดึงค่า RemoteValue
การดึงข้อมูล RemoteValue
รับประกันว่าจะสำเร็จหากดำเนินการฟังก์ชันสำเร็จ เนื่องจากขณะนี้ค่าตอบแทนจะถูกคัดลอกไปยังผู้ประสานงานทันทีหลังจากดำเนินการฟังก์ชัน หากมีข้อผิดพลาดของผู้ปฏิบัติงานในระหว่างการคัดลอก ฟังก์ชันนี้จะถูกลองอีกครั้งกับผู้ปฏิบัติงานอื่นที่มีอยู่ ดังนั้น ถ้าคุณต้องการปรับให้เหมาะสมสำหรับประสิทธิภาพ คุณสามารถกำหนดเวลาฟังก์ชันโดยไม่มีค่าส่งคืนได้
การรายงานข้อผิดพลาด
เมื่อผู้ประสานงานพบข้อผิดพลาด เช่น UnavailableError
จากเซิร์ฟเวอร์พารามิเตอร์ หรือข้อผิดพลาดของแอปพลิเคชันอื่นๆ เช่น InvalidArgument
จาก tf.debugging.check_numerics
ระบบจะยกเลิกฟังก์ชันที่รอดำเนินการและอยู่ในคิวทั้งหมดก่อนที่จะสร้างข้อผิดพลาด การดึงข้อมูล RemoteValue
ที่เกี่ยวข้องจะทำให้เกิด CancelledError
หลังจากเกิดข้อผิดพลาด ผู้ประสานงานจะไม่แจ้งข้อผิดพลาดเดิมหรือข้อผิดพลาดใดๆ จากฟังก์ชันที่ถูกยกเลิก
การปรับปรุงประสิทธิภาพ
มีสาเหตุที่เป็นไปได้หลายประการ หากคุณพบปัญหาด้านประสิทธิภาพเมื่อคุณฝึกด้วย ParameterServerStrategy
และ ClusterResolver
สาเหตุทั่วไปประการหนึ่งคือเซิร์ฟเวอร์พารามิเตอร์มีโหลดที่ไม่สมดุล และเซิร์ฟเวอร์พารามิเตอร์ที่โหลดหนักบางตัวมีความจุถึงขีดจำกัดแล้ว นอกจากนี้ยังสามารถมีได้หลายสาเหตุ วิธีง่ายๆ ในการแก้ไขปัญหานี้คือ:
- แบ่งส่วนตัวแปรโมเดลขนาดใหญ่ของคุณผ่านการระบุ
variable_partitioner
เมื่อสร้างParameterServerStrategy
- หลีกเลี่ยงการสร้างตัวแปรฮอตสปอตที่เซิร์ฟเวอร์พารามิเตอร์ทั้งหมดต้องการในขั้นตอนเดียว ถ้าเป็นไปได้ ตัวอย่างเช่น ใช้อัตราการเรียนรู้คงที่หรือคลาสย่อย
tf.keras.optimizers.schedules.LearningRateSchedule
ในตัวเพิ่มประสิทธิภาพเนื่องจากพฤติกรรมเริ่มต้นคืออัตราการเรียนรู้จะกลายเป็นตัวแปรที่วางอยู่บนเซิร์ฟเวอร์พารามิเตอร์เฉพาะและร้องขอโดยเซิร์ฟเวอร์พารามิเตอร์อื่นทั้งหมดในแต่ละขั้นตอน . - สับเปลี่ยนคำศัพท์ขนาดใหญ่ของคุณก่อนที่จะส่งต่อไปยังเลเยอร์การประมวลผลล่วงหน้าของ Keras
อีกสาเหตุที่เป็นไปได้สำหรับปัญหาด้านประสิทธิภาพคือผู้ประสานงาน การใช้งาน schedule
/ join
ครั้งแรกของคุณเป็นแบบ Python และอาจมีค่าใช้จ่ายในการทำเธรด นอกจากนี้ เวลาแฝงระหว่างผู้ประสานงานและพนักงานอาจมีขนาดใหญ่ หากเป็นกรณีนี้
สำหรับ
Model.fit
คุณสามารถตั้งค่าอาร์กิวเมนต์steps_per_execution
ที่Model.compile
เป็นค่าที่มากกว่า 1สำหรับลูปการฝึกแบบกำหนดเอง คุณสามารถแพ็คหลายขั้นตอนไว้ใน
tf.function
เดียว:
steps_per_invocation = 10
@tf.function
def step_fn(iterator):
for _ in range(steps_per_invocation):
features, labels = next(iterator)
def replica_fn(features, labels):
...
strategy.run(replica_fn, args=(features, labels))
เนื่องจากไลบรารีได้รับการปรับให้เหมาะสมยิ่งขึ้น หวังว่าผู้ใช้ส่วนใหญ่จะไม่ต้องแพ็คขั้นตอนด้วยตนเองอีกในอนาคต
นอกจากนี้ เคล็ดลับเล็กๆ น้อยๆ สำหรับการปรับปรุงประสิทธิภาพคือการจัดกำหนดการฟังก์ชันโดยไม่มีค่าส่งคืนตามที่อธิบายไว้ในส่วนการจัดการความล้มเหลวของงานด้านบน
ข้อจำกัดที่ทราบ
ข้อจำกัดที่ทราบส่วนใหญ่มีอยู่แล้วในหัวข้อข้างต้น ส่วนนี้ให้ข้อมูลสรุป
ParameterServerStrategy
ทั่วไป
-
os.environment["grpc_fail_fast"]="use_caller"
เป็นสิ่งจำเป็นในทุกงานรวมถึงผู้ประสานงาน เพื่อให้การทนต่อข้อผิดพลาดทำงานได้อย่างถูกต้อง - ไม่รองรับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์แบบซิงโครนัส
- โดยปกติจำเป็นต้องรวมหลายขั้นตอนไว้ในฟังก์ชันเดียวเพื่อให้ได้ประสิทธิภาพสูงสุด
- ไม่รองรับการโหลด save_model ผ่าน
tf.saved_model.load
ที่มีตัวแปรชาร์ด หมายเหตุการโหลด save_model โดยใช้ TensorFlow Serving นั้นคาดว่าจะใช้งานได้ - ไม่รองรับการโหลดจุดตรวจสอบที่มีตัวแปรช่องเครื่องมือเพิ่มประสิทธิภาพการแบ่งส่วนข้อมูลในส่วนแบ่งข้อมูลจำนวนอื่น
- ไม่สนับสนุนการกู้คืนจากความล้มเหลวของเซิร์ฟเวอร์พารามิเตอร์โดยไม่ต้องรีสตาร์ทงานผู้ประสานงาน
- การใช้
tf.lookup.StaticHashTable
(ซึ่งโดยทั่วไปจะใช้โดยเลเยอร์การประมวลผลล่วงหน้าของ Keras เช่นtf.keras.layers.IntegerLookup
,tf.keras.layers.StringLookup
และtf.keras.layers.TextVectorization
) ส่งผลให้เกิดทรัพยากรที่วางอยู่บน ผู้ประสานงานในเวลานี้กับการฝึกอบรมเซิร์ฟเวอร์พารามิเตอร์ สิ่งนี้มีผลกระทบด้านประสิทธิภาพสำหรับการค้นหา RPC จากผู้ปฏิบัติงานไปยังผู้ประสานงาน นี้เป็นลำดับความสำคัญสูงในปัจจุบันที่จะกล่าวถึง
Model.fit
เฉพาะ
- จำเป็นต้องมีอาร์กิวเมนต์
steps_per_epoch
ในModel.fit
คุณสามารถเลือกค่าที่ให้ช่วงเวลาที่เหมาะสมในยุคหนึ่งได้ -
ParameterServerStrategy
ไม่รองรับการเรียกกลับแบบกำหนดเองที่มีการเรียกระดับแบทช์เพื่อเหตุผลด้านประสิทธิภาพ คุณควรแปลงการโทรเหล่านั้นเป็นการเรียกระดับ epoch โดยเลือกsteps_per_epoch
อย่างเหมาะสม เพื่อให้ถูกเรียกทุกจำนวนsteps_per_epoch
การเรียกกลับในตัวจะไม่ได้รับผลกระทบ: การเรียกระดับแบทช์ของพวกเขาได้รับการแก้ไขเพื่อให้มีประสิทธิภาพ กำลังวางแผนรองรับการเรียกระดับชุดงานสำหรับParameterServerStrategy
- ด้วยเหตุผลเดียวกัน แถบความคืบหน้าและเมตริกต่างจากกลยุทธ์อื่นๆ ที่บันทึกที่ขอบเขตยุคเท่านั้น
- ไม่รองรับ
run_eagerly
ข้อมูลเฉพาะของลูปการฝึกแบบกำหนดเอง
-
ClusterCoordinator.schedule
ไม่รองรับการรับประกันการเข้าชมสำหรับชุดข้อมูล - เมื่อใช้
ClusterCoordinator.create_per_worker_dataset
ชุดข้อมูลทั้งหมดจะต้องถูกสร้างขึ้นภายในฟังก์ชันที่ส่งผ่านไปยังชุดข้อมูลนั้น -
tf.data.Options
จะถูกละเว้นในชุดข้อมูลที่สร้างโดยClusterCoordinator.create_per_worker_dataset