ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
tf.distribute API ช่วยให้ผู้ใช้สามารถปรับขนาดการฝึกอบรมจากเครื่องเดียวไปยังหลายเครื่องได้ เมื่อปรับขนาดโมเดล ผู้ใช้ยังต้องกระจายข้อมูลเข้าในอุปกรณ์หลายเครื่อง tf.distribute
จัดเตรียม API ซึ่งคุณสามารถแจกจ่ายอินพุตของคุณผ่านอุปกรณ์ต่างๆ ได้โดยอัตโนมัติ
คู่มือนี้จะแสดงวิธีต่างๆ ที่คุณสามารถสร้างชุดข้อมูลแบบกระจายและตัววนซ้ำโดยใช้ tf.distribute
API นอกจากนี้ หัวข้อต่อไปนี้จะครอบคลุม:
- ตัวเลือกการใช้งาน การแบ่งกลุ่ม และการแบ่งกลุ่มเมื่อใช้
tf.distribute.Strategy.experimental_distribute_dataset
และtf.distribute.Strategy.distribute_datasets_from_function
- วิธีต่างๆ ที่คุณสามารถวนซ้ำชุดข้อมูลแบบกระจาย
- ความแตกต่างระหว่าง
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
APIs และtf.data
API รวมถึงข้อจำกัดใดๆ ที่ผู้ใช้อาจพบในการใช้งาน
คู่มือนี้ไม่ครอบคลุมการใช้อินพุตแบบกระจายกับ Keras API
ชุดข้อมูลแบบกระจาย
หากต้องการใช้ tf.distribute
APIs เพื่อปรับขนาด ขอแนะนำให้ผู้ใช้ใช้ tf.data.Dataset
เพื่อเป็นตัวแทนของอินพุต tf.distribute
ถูกสร้างมาให้ทำงานอย่างมีประสิทธิภาพด้วย tf.data.Dataset
(เช่น การดึงข้อมูลล่วงหน้าอัตโนมัติไปยังอุปกรณ์เร่งความเร็วแต่ละเครื่อง) โดยมีการเพิ่มประสิทธิภาพการทำงานอย่างสม่ำเสมอในการนำไปใช้งาน หากคุณมีกรณีการใช้งานอย่างอื่นที่ไม่ใช่ tf.data.Dataset
โปรดดู ส่วน หลังในคู่มือนี้ ในการฝึกอบรมแบบไม่กระจาย ผู้ใช้จะสร้างอินสแตนซ์ tf.data.Dataset
ก่อน แล้วจึงวนซ้ำองค์ประกอบ ตัวอย่างเช่น:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
เพื่อให้ผู้ใช้สามารถใช้กลยุทธ์ tf.distribute
โดยมีการเปลี่ยนแปลงเล็กน้อยในโค้ดที่มีอยู่ของผู้ใช้ จึงมีการแนะนำ API สองรายการซึ่งจะแจกจ่ายอินสแตนซ์ tf.data.Dataset
และส่งคืนออบเจ็กต์ชุดข้อมูลแบบกระจาย ผู้ใช้สามารถวนซ้ำบนอินสแตนซ์ชุดข้อมูลแบบกระจายนี้และฝึกโมเดลของตนเหมือนเมื่อก่อน ให้เราดูที่ API ทั้งสอง - tf.distribute.Strategy.experimental_distribute_dataset
และ tf.distribute.Strategy.distribute_datasets_from_function
ในรายละเอียดเพิ่มเติม:
tf.distribute.Strategy.experimental_distribute_dataset
การใช้งาน
API นี้ใช้อินสแตนซ์ tf.data.Dataset
เป็นอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset
คุณควรแบทช์ชุดข้อมูลอินพุตด้วยค่าที่เท่ากับขนาดแบทช์ส่วนกลาง ขนาดแบทช์ส่วนกลางนี้คือจำนวนตัวอย่างที่คุณต้องการประมวลผลในอุปกรณ์ทั้งหมดใน 1 ขั้นตอน คุณสามารถวนซ้ำชุดข้อมูลที่กระจายนี้ในรูปแบบ Pythonic หรือสร้างตัววนซ้ำโดยใช้ iter
ออบเจ็กต์ที่ส่งคืนไม่ใช่อินสแตนซ์ tf.data.Dataset
และไม่รองรับ API อื่นใดที่แปลงหรือตรวจสอบชุดข้อมูลไม่ว่าในทางใด นี่คือ API ที่แนะนำ หากคุณไม่มีวิธีเฉพาะเจาะจงที่คุณต้องการแบ่งข้อมูลอินพุตของคุณบนแบบจำลองต่างๆ
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
คุณสมบัติ
แบทช์
tf.distribute
ทำการรีแบทช์อินพุตอินสแตนซ์ tf.data.Dataset
ด้วยขนาดแบทช์ใหม่ที่เท่ากับขนาดแบทช์ส่วนกลางหารด้วยจำนวนเรพลิกาในการซิงค์ จำนวนแบบจำลองที่ซิงค์จะเท่ากับจำนวนอุปกรณ์ที่มีส่วนร่วมในการไล่ระดับทั้งหมดลดระหว่างการฝึก เมื่อผู้ใช้เรียกใช้ตัววนซ้ำแบบกระจาย ข้อมูลขนาดแบทช์ next
แบบจำลองจะถูกส่งคืนในแต่ละแบบจำลอง คาร์ดินาลลิตี้ชุดข้อมูลที่รีแบทช์จะเป็นจำนวนของเรพลิกาเสมอ ต่อไปนี้คือตัวอย่างบางส่วน:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- ไม่มีการแจกจ่าย:
- รุ่นที่ 1: [0, 1, 2, 3]
- รุ่นที่ 2: [4, 5]
ด้วยการแจกจ่ายมากกว่า 2 แบบจำลอง ชุดสุดท้าย ([4, 5]) ถูกแบ่งระหว่าง 2 แบบจำลอง
รุ่นที่ 1:
- แบบจำลอง 1:[0, 1]
- แบบจำลอง 2:[2, 3]
ชุดที่ 2:
- แบบจำลอง 2: [4]
- แบบจำลอง 2: [5]
tf.data.Dataset.range(4).batch(4)
- ไม่มีการแจกจ่าย:
- รุ่นที่ 1: [[0], [1], [2], [3]]
- ด้วยการแจกจ่ายมากกว่า 5 แบบจำลอง:
- รุ่นที่ 1:
- แบบจำลอง 1: [0]
- แบบจำลอง 2: [1]
- แบบจำลอง 3: [2]
- แบบจำลอง 4: [3]
- แบบจำลอง 5: []
tf.data.Dataset.range(8).batch(4)
- ไม่มีการแจกจ่าย:
- รุ่นที่ 1: [0, 1, 2, 3]
- รุ่นที่ 2: [4, 5, 6, 7]
- ด้วยการแจกจ่ายมากกว่า 3 แบบจำลอง:
- รุ่นที่ 1:
- แบบจำลอง 1: [0, 1]
- แบบจำลอง 2: [2, 3]
- แบบจำลอง 3: []
- ชุดที่ 2:
- แบบจำลอง 1: [4, 5]
- แบบจำลอง 2: [6, 7]
- แบบจำลอง 3: []
การรีแบทช์ชุดข้อมูลมีความซับซ้อนของพื้นที่เพิ่มขึ้นตามจำนวนเรพลิกา ซึ่งหมายความว่าสำหรับกรณีการใช้งานการฝึกอบรมผู้ปฏิบัติงานหลายคน ไปป์ไลน์อินพุตสามารถเรียกใช้ข้อผิดพลาด OOM ได้
ชาร์ดิง
tf.distribute
ยังสร้างชาร์ดอัตโนมัติชุดข้อมูลอินพุตในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย MultiWorkerMirroredStrategy
และ TPUStrategy
แต่ละชุดข้อมูลถูกสร้างขึ้นบนอุปกรณ์ CPU ของผู้ปฏิบัติงาน การแบ่งกลุ่มข้อมูลอัตโนมัติบนชุดของผู้ปฏิบัติงานหมายความว่าผู้ปฏิบัติงานแต่ละคนได้รับมอบหมายชุดย่อยของชุดข้อมูลทั้งหมด (หากตั้งค่า tf.data.experimental.AutoShardPolicy
ที่ถูกต้อง) ทั้งนี้เพื่อให้แน่ใจว่าในแต่ละขั้นตอน ขนาดแบทช์โกลบอลขององค์ประกอบชุดข้อมูลที่ไม่ทับซ้อนกันจะถูกประมวลผลโดยผู้ปฏิบัติงานแต่ละคน Autosharding มีตัวเลือกที่แตกต่างกันสองสามตัวที่สามารถระบุได้โดยใช้ tf.data.experimental.DistributeOptions
โปรดทราบว่าไม่มีการชาร์ดอัตโนมัติในการฝึกอบรมผู้ปฏิบัติงานหลายคนด้วย ParameterServerStrategy
และข้อมูลเพิ่มเติมเกี่ยวกับการสร้างชุดข้อมูลด้วยกลยุทธ์นี้ สามารถดูได้ในบทช่วย สอนกลยุทธ์เซิร์ฟเวอร์พารามิเตอร์
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
มีสามตัวเลือกที่แตกต่างกันซึ่งคุณสามารถตั้งค่าสำหรับ tf.data.experimental.AutoShardPolicy
:
- อัตโนมัติ: นี่เป็นตัวเลือกเริ่มต้นซึ่งหมายความว่าจะมีการพยายามแบ่งส่วนข้อมูลโดย FILE ความพยายามที่จะชาร์ดโดย FILE ล้มเหลวหากตรวจไม่พบชุดข้อมูลแบบไฟล์
tf.distribute
จะถอยกลับไปใช้การแบ่งกลุ่มตาม DATA โปรดทราบว่าหากชุดข้อมูลอินพุตเป็นแบบไฟล์ แต่จำนวนไฟล์น้อยกว่าจำนวนผู้ปฏิบัติงานInvalidArgumentError
จะเพิ่มขึ้น หากเกิดกรณีนี้ขึ้น ให้ตั้งค่านโยบายเป็นAutoShardPolicy.DATA
อย่างชัดเจน หรือแยกแหล่งที่มาอินพุตของคุณเป็นไฟล์ขนาดเล็กลง เพื่อให้จำนวนไฟล์มากกว่าจำนวนผู้ปฏิบัติงาน ไฟล์: นี่คือตัวเลือกถ้าคุณต้องการแบ่งไฟล์อินพุตให้กับผู้ปฏิบัติงานทั้งหมด คุณควรใช้ตัวเลือกนี้หากจำนวนไฟล์อินพุตมากกว่าจำนวนผู้ปฏิบัติงานมาก และข้อมูลในไฟล์มีการกระจายอย่างเท่าเทียมกัน ข้อเสียของตัวเลือกนี้คือมีผู้ปฏิบัติงานที่ไม่ได้ใช้งานหากข้อมูลในไฟล์ไม่กระจายอย่างเท่าเทียมกัน หากจำนวนไฟล์น้อยกว่าจำนวนผู้ปฏิบัติงาน
InvalidArgumentError
จะเพิ่มขึ้น หากเป็นเช่นนี้ ให้ตั้งค่านโยบายเป็นAutoShardPolicy.DATA
อย่างชัดเจน ตัวอย่างเช่น ให้เราแจกจ่ายไฟล์ 2 ไฟล์ให้กับผู้ปฏิบัติงาน 2 คนโดยแต่ละไฟล์จำลอง 1 รายการ ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5] และไฟล์ 2 ประกอบด้วย [6, 7, 8, 9, 10, 11] ให้จำนวนเรพลิกาทั้งหมดในการซิงค์เป็น 2 และขนาดแบตช์ทั่วโลกคือ 4- คนทำงาน 0:
- แบทช์ 1 = แบบจำลอง 1: [0, 1]
- แบทช์ 2 = แบบจำลอง 1: [2, 3]
- แบทช์ 3 = แบบจำลอง 1: [4]
- แบทช์ 4 = แบบจำลอง 1: [5]
- คนงาน 1:
- แบทช์ 1 = แบบจำลอง 2: [6, 7]
- แบทช์ 2 = แบบจำลอง 2: [8, 9]
- แบทช์ 3 = แบบจำลอง 2: [10]
- แบทช์ 4 = แบบจำลอง 2: [11]
ข้อมูล: สิ่งนี้จะทำการชาร์ดอัตโนมัติขององค์ประกอบในผู้ปฏิบัติงานทั้งหมด ผู้ปฏิบัติงานแต่ละคนจะอ่านชุดข้อมูลทั้งหมดและประมวลผลเฉพาะชาร์ดที่ได้รับมอบหมายเท่านั้น ชาร์ดอื่นๆ ทั้งหมดจะถูกยกเลิก โดยทั่วไปจะใช้สิ่งนี้หากจำนวนไฟล์อินพุตน้อยกว่าจำนวนผู้ปฏิบัติงาน และคุณต้องการการแบ่งส่วนข้อมูลที่ดีขึ้นในผู้ปฏิบัติงานทั้งหมด ข้อเสียคือชุดข้อมูลทั้งหมดจะถูกอ่านสำหรับผู้ปฏิบัติงานแต่ละคน ตัวอย่างเช่น ให้เราแจกจ่าย 1 ไฟล์มากกว่า 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2
- คนทำงาน 0:
- แบทช์ 1 = แบบจำลอง 1: [0, 1]
- แบทช์ 2 = แบบจำลอง 1: [4, 5]
- แบทช์ 3 = แบบจำลอง 1: [8, 9]
- คนงาน 1:
- แบทช์ 1 = แบบจำลอง 2: [2, 3]
- แบทช์ 2 = แบบจำลอง 2: [6, 7]
- แบทช์ 3 = แบบจำลอง 2: [10, 11]
ปิด: หากคุณปิดการชาร์ทอัตโนมัติ พนักงานแต่ละคนจะประมวลผลข้อมูลทั้งหมด ตัวอย่างเช่น ให้เราแจกจ่าย 1 ไฟล์มากกว่า 2 คน ไฟล์ 1 ประกอบด้วย [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] ให้จำนวนการจำลองทั้งหมดที่ซิงค์เป็น 2 จากนั้นผู้ปฏิบัติงานแต่ละคนจะเห็นการแจกจ่ายต่อไปนี้:
- คนทำงาน 0:
- แบทช์ 1 = แบบจำลอง 1: [0, 1]
- แบทช์ 2 = แบบจำลอง 1: [2, 3]
- แบทช์ 3 = แบบจำลอง 1: [4, 5]
- แบทช์ 4 = แบบจำลอง 1: [6, 7]
- แบทช์ 5 = แบบจำลอง 1: [8, 9]
แบทช์ 6 = แบบจำลอง 1: [10, 11]
คนงาน 1:
แบทช์ 1 = แบบจำลอง 2: [0, 1]
แบทช์ 2 = แบบจำลอง 2: [2, 3]
แบทช์ 3 = แบบจำลอง 2: [4, 5]
แบทช์ 4 = แบบจำลอง 2: [6, 7]
แบทช์ 5 = แบบจำลอง 2: [8, 9]
แบทช์ 6 = แบบจำลอง 2: [10, 11]
กำลังดึงข้อมูลล่วงหน้า
โดยค่าเริ่มต้น tf.distribute
จะเพิ่มการแปลงการดึงข้อมูลล่วงหน้าที่ส่วนท้ายของอินสแตนซ์ tf.data.Dataset
ที่ผู้ใช้ระบุ อาร์กิวเมนต์สำหรับการแปลงค่าล่วงหน้าซึ่งเป็น buffer_size
เท่ากับจำนวนเรพลิกาที่ซิงค์
tf.distribute.Strategy.distribute_datasets_from_function
การใช้งาน
API นี้ใช้ฟังก์ชันอินพุตและส่งคืนอินสแตนซ์ tf.distribute.DistributedDataset
ฟังก์ชันอินพุตที่ผู้ใช้ส่งผ่านมีอาร์กิวเมนต์ tf.distribute.InputContext
และควรส่งคืนอินสแตนซ์ tf.data.Dataset
ด้วย API นี้ tf.distribute
จะไม่ทำการเปลี่ยนแปลงใดๆ เพิ่มเติมกับอินสแตนซ์ tf.data.Dataset
ของผู้ใช้ที่ส่งคืนจากฟังก์ชันอินพุต เป็นความรับผิดชอบของผู้ใช้ในการแบทช์และชาร์ดชุดข้อมูล tf.distribute
เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของผู้ปฏิบัติงานแต่ละคน นอกเหนือจากการอนุญาตให้ผู้ใช้ระบุตรรกะการแบ่งกลุ่มและการแบ่งกลุ่มของตัวเองแล้ว API นี้ยังแสดงให้เห็นถึงความสามารถในการปรับขนาดและประสิทธิภาพที่ดีขึ้นเมื่อเทียบกับ tf.distribute.Strategy.experimental_distribute_dataset
เมื่อใช้สำหรับการฝึกอบรมผู้ปฏิบัติงานหลายคน
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
คุณสมบัติ
แบทช์
อินสแตนซ์ tf.data.Dataset
ที่เป็นค่าส่งคืนของฟังก์ชันอินพุตควรถูกแบทช์โดยใช้ขนาดแบทช์ต่อแบบจำลอง ขนาดชุดงานต่อแบบจำลองคือขนาดชุดงานส่วนกลางหารด้วยจำนวนแบบจำลองที่เข้าร่วมในการฝึกการซิงค์ นี่เป็นเพราะ tf.distribute
เรียกใช้ฟังก์ชันอินพุตบนอุปกรณ์ CPU ของผู้ปฏิบัติงานแต่ละคน ชุดข้อมูลที่สร้างขึ้นจากผู้ปฏิบัติงานที่กำหนดควรพร้อมใช้งานโดยแบบจำลองทั้งหมดของผู้ปฏิบัติงานนั้น
ชาร์ดิง
ออบเจ็กต์ tf.distribute.InputContext
ที่ส่งผ่านโดยปริยายเป็นอาร์กิวเมนต์ไปยังฟังก์ชันอินพุตของผู้ใช้ ถูกสร้างขึ้นโดย tf.distribute
ภายใต้ประทุน มีข้อมูลเกี่ยวกับจำนวนผู้ปฏิบัติงาน รหัสผู้ปฏิบัติงานปัจจุบัน ฯลฯ ฟังก์ชันอินพุตนี้สามารถจัดการการแบ่งส่วนข้อมูลตามนโยบายที่กำหนดโดยผู้ใช้โดยใช้คุณสมบัติเหล่านี้ซึ่งเป็นส่วนหนึ่งของอ็อบเจ็กต์ tf.distribute.InputContext
กำลังดึงข้อมูลล่วงหน้า
tf.distribute
ไม่ได้เพิ่มการแปลง prefetch ที่ส่วนท้ายของ tf.data.Dataset
ที่ส่งคืนโดยผู้ใช้ที่ให้ฟังก์ชันอินพุต
Iterators แบบกระจาย
คล้ายกับอินสแตนซ์ tf.data.Dataset
ที่ไม่ได้แจกจ่าย คุณจะต้องสร้างตัววนซ้ำบนอินสแตนซ์ tf.distribute.DistributedDataset
เพื่อวนซ้ำและเข้าถึงองค์ประกอบใน tf.distribute.DistributedDataset
ต่อไปนี้เป็นวิธีที่คุณสามารถสร้าง tf.distribute.DistributedIterator
และใช้เพื่อฝึกโมเดลของคุณ:
ประเพณี
ใช้ Pythonic for loop construct
คุณสามารถใช้ลูป Pythonic ที่เป็นมิตรกับผู้ใช้เพื่อวนซ้ำบน tf.distribute.DistributedDataset
อิลิเมนต์ที่ส่งคืนจาก tf.distribute.DistributedIterator
สามารถเป็น tf.Tensor
เดียวหรือ tf.distribute.DistributedValues
ซึ่งมีค่าต่อการจำลอง การวางลูปใน tf.function
จะช่วยเพิ่มประสิทธิภาพ อย่างไรก็ตาม ขณะนี้ไม่รองรับการ break
และ return
สำหรับการวนซ้ำ tf.distribute.DistributedDataset
ที่วางอยู่ภายใน tf.function
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
ใช้ iter
เพื่อสร้างตัววนซ้ำที่ชัดเจน
ในการวนซ้ำองค์ประกอบในอินสแตนซ์ tf.distribute.DistributedDataset
คุณสามารถสร้าง tf.distribute.DistributedIterator
โดยใช้ iter
API ได้ ด้วยตัววนซ้ำที่ชัดเจน คุณสามารถทำซ้ำสำหรับจำนวนขั้นตอนที่แน่นอนได้ เพื่อรับองค์ประกอบถัดไปจากอินสแตนซ์ tf.distribute.DistributedIterator
dist_iterator
คุณสามารถเรียก next(dist_iterator)
, dist_iterator.get_next()
หรือ dist_iterator.get_next_as_optional()
สองอันแรกนั้นเหมือนกันโดยพื้นฐานแล้ว:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
ด้วย next()
หรือ tf.distribute.DistributedIterator.get_next()
หาก tf.distribute.DistributedIterator
ถึงจุดสิ้นสุด ข้อผิดพลาด OutOfRange จะถูกส่งออกไป ลูกค้าสามารถตรวจจับข้อผิดพลาดทางฝั่งหลามและทำงานอื่นๆ ต่อไปได้ เช่น ด่านตรวจและประเมินผล อย่างไรก็ตาม สิ่งนี้จะไม่ทำงานหากคุณใช้ลูปการฝึกโฮสต์ (เช่น รันหลายขั้นตอนต่อ tf.function
) ซึ่งมีลักษณะดังนี้:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
มีหลายขั้นตอนโดยการห่อตัวขั้นตอนภายใน tf.range
ในกรณีนี้ การวนซ้ำที่แตกต่างกันในลูปที่ไม่มีการขึ้นต่อกันสามารถเริ่มต้นพร้อมกันได้ ดังนั้นจึงสามารถทริกเกอร์ข้อผิดพลาด OutOfRange ในการวนซ้ำในภายหลังก่อนที่การคำนวณการวนซ้ำก่อนหน้าจะเสร็จสิ้น เมื่อเกิดข้อผิดพลาด OutOfRange การดำเนินการทั้งหมดในฟังก์ชันจะถูกยกเลิกทันที หากเป็นบางกรณีที่คุณต้องการหลีกเลี่ยง ทางเลือกอื่นที่ไม่เกิดข้อผิดพลาด OutOfRange คือ tf.distribute.DistributedIterator.get_next_as_optional()
get_next_as_optional
ส่งคืน tf.experimental.Optional
ซึ่งมีองค์ประกอบถัดไปหรือไม่มีค่าหาก tf.distribute.DistributedIterator
ถึงจุดสิ้นสุด
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
การใช้คุณสมบัติ element_spec
หากคุณส่งผ่านองค์ประกอบของชุดข้อมูลที่แจกจ่ายไปยัง tf.function
และต้องการการรับประกัน tf.TypeSpec
คุณสามารถระบุอาร์กิวเมนต์ input_signature
ของ tf.function
ได้ เอาต์พุตของชุดข้อมูลแบบกระจายคือ tf.distribute.DistributedValues
ซึ่งสามารถแสดงอินพุตไปยังอุปกรณ์เดียวหรือหลายอุปกรณ์ ในการรับ tf.TypeSpec
ที่สอดคล้องกับค่าแบบกระจายนี้ คุณสามารถใช้คุณสมบัติ element_spec
ของชุดข้อมูลแบบกระจายหรืออ็อบเจ็กต์ iterator แบบกระจาย
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
แบทช์บางส่วน
พบแบทช์บางส่วนเมื่ออินสแตนซ์ tf.data.Dataset
ที่ผู้ใช้สร้างอาจมีขนาดแบทช์ที่ไม่แบ่งเท่า ๆ กันด้วยจำนวนของเรพพลิกา หรือเมื่อคาร์ดินาลลิตี้ของอินสแตนซ์ชุดข้อมูลไม่หารด้วยขนาดแบทช์ ซึ่งหมายความว่าเมื่อมีการแจกจ่ายชุดข้อมูลบนแบบจำลองหลายตัว การเรียก next
บนตัววนซ้ำบางตัวจะส่งผลให้เกิด OutOfRangeError เพื่อจัดการกับกรณีการใช้งานนี้ tf.distribute
จะส่งคืนแบตช์จำลองที่มีขนาดแบตช์ 0 บนเรพลิกาที่ไม่มีข้อมูลในการประมวลผลอีกต่อไป
สำหรับกรณีผู้ปฏิบัติงานคนเดียว หากไม่มีการส่งคืนข้อมูลโดยการโทร next
บนตัววนซ้ำ แบตช์จำลองที่มีขนาดแบตช์ 0 จะถูกสร้างและใช้งานพร้อมกับข้อมูลจริงในชุดข้อมูล ในกรณีของแบทช์บางส่วน แบทช์โกลบอลชุดสุดท้ายจะมีข้อมูลจริงควบคู่ไปกับแบทช์จำลองของข้อมูล เงื่อนไขการหยุดสำหรับการประมวลผลข้อมูลในขณะนี้จะตรวจสอบว่าเรพลิกาใด ๆ มีข้อมูลหรือไม่ หากไม่มีข้อมูลบนแบบจำลองใดๆ ข้อผิดพลาด OutOfRange จะถูกส่งออกไป
สำหรับกรณีที่มีผู้ปฏิบัติงานหลายคน ค่าบูลีนที่แสดงการมีอยู่ของข้อมูลในแต่ละผู้ปฏิบัติงานจะถูกรวมโดยใช้การสื่อสารข้ามเรพลิกา และค่านี้ใช้เพื่อระบุว่าผู้ปฏิบัติงานทั้งหมดประมวลผลชุดข้อมูลที่แจกจ่ายเสร็จแล้วหรือไม่ เนื่องจากสิ่งนี้เกี่ยวข้องกับการสื่อสารระหว่างผู้ปฏิบัติงาน จึงมีบทลงโทษด้านประสิทธิภาพบางอย่างที่เกี่ยวข้อง
คำเตือน
เมื่อใช้
tf.distribute.Strategy.experimental_distribute_dataset
APIs กับการตั้งค่าผู้ปฏิบัติงานหลายราย ผู้ใช้จะส่งtf.data.Dataset
ที่อ่านจากไฟล์ ถ้าtf.data.experimental.AutoShardPolicy
ถูกตั้งค่าเป็นAUTO
หรือFILE
ขนาดแบทช์ตามจริงต่อขั้นตอนอาจเล็กกว่าขนาดแบทช์ส่วนกลางที่ผู้ใช้กำหนด สิ่งนี้สามารถเกิดขึ้นได้เมื่อองค์ประกอบที่เหลือในไฟล์น้อยกว่าขนาดแบตช์ส่วนกลาง ผู้ใช้สามารถทำให้ชุดข้อมูลหมดโดยไม่ต้องขึ้นอยู่กับจำนวนขั้นตอนในการรัน หรือตั้งค่าtf.data.experimental.AutoShardPolicy
เป็นDATA
เพื่อแก้ไขขณะนี้ยังไม่รองรับการแปลงชุดข้อมูลแบบเก็บสถานะด้วย
tf.distribute
และการดำเนินการเก็บสถานะใดๆ ที่ชุดข้อมูลอาจมีจะถูกละเว้น ตัวอย่างเช่น หากชุดข้อมูลของคุณมีmap_fn
ที่ใช้tf.random.uniform
เพื่อหมุนภาพ แสดงว่าคุณมีกราฟชุดข้อมูลที่ขึ้นอยู่กับสถานะ (เช่น การสุ่มเมล็ด) บนเครื่องในพื้นที่ที่มีการดำเนินการกระบวนการหลามtf.data.experimental.OptimizationOptions
ทดลองที่ถูกปิดใช้งานโดยค่าเริ่มต้นสามารถในบริบทบางอย่างได้ เช่น เมื่อใช้ร่วมกับtf.distribute
ทำให้ประสิทธิภาพลดลง คุณควรเปิดใช้งานหลังจากตรวจสอบว่ามีประโยชน์ต่อประสิทธิภาพของปริมาณงานของคุณในการตั้งค่าการกระจายเท่านั้นโปรดดู คู่มือนี้ สำหรับวิธีเพิ่มประสิทธิภาพไพพ์ไลน์อินพุตของคุณด้วย
tf.data
โดยทั่วไป เคล็ดลับเพิ่มเติมบางประการ:หากคุณมีผู้ปฏิบัติงานหลายคนและกำลังใช้
tf.data.Dataset.list_files
เพื่อสร้างชุดข้อมูลจากไฟล์ทั้งหมดที่ตรงกับรูปแบบโกลบอลอย่างน้อยหนึ่งรูปแบบ อย่าลืมตั้งค่าอาร์กิวเมนต์seed
หรือตั้งค่าshuffle=False
เพื่อให้ผู้ปฏิบัติงานแต่ละคนแบ่งไฟล์อย่างสม่ำเสมอหากไปป์ไลน์อินพุตของคุณมีทั้งการสับเปลี่ยนข้อมูลในระดับเร็กคอร์ดและการแยกวิเคราะห์ข้อมูล เว้นแต่ข้อมูลที่ไม่ถูกแยกวิเคราะห์จะมีขนาดใหญ่กว่าข้อมูลที่แยกวิเคราะห์อย่างมีนัยสำคัญ (ซึ่งโดยปกติไม่ใช่กรณี) สับเปลี่ยนก่อนแล้วจึงแยกวิเคราะห์ ดังที่แสดงในตัวอย่างต่อไปนี้ ซึ่งอาจเป็นประโยชน์ต่อการใช้งานหน่วยความจำและประสิทธิภาพ
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
รักษาบัฟเฟอร์ภายในขององค์ประกอบbuffer_size
และด้วยเหตุนี้การลดbuffer_size
อาจช่วยบรรเทาปัญหา OOM ได้ไม่รับประกันลำดับที่ข้อมูลถูกประมวลผลโดยผู้ปฏิบัติงานเมื่อใช้
tf.distribute.experimental_distribute_dataset
หรือtf.distribute.distribute_datasets_from_function
โดยทั่วไปจำเป็นต้องใช้หากคุณใช้tf.distribute
เพื่อคาดการณ์มาตราส่วน อย่างไรก็ตาม คุณสามารถแทรกดัชนีสำหรับแต่ละองค์ประกอบในแบทช์และเอาต์พุตของคำสั่งซื้อได้ ตัวอย่างต่อไปนี้คือตัวอย่างวิธีการสั่งซื้อเอาต์พุต
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
ฉันจะแจกจ่ายข้อมูลของฉันได้อย่างไรหากฉันไม่ได้ใช้อินสแตนซ์ tf.data.Dataset ที่เป็นที่ยอมรับ
ในบางครั้ง ผู้ใช้ไม่สามารถใช้ tf.data.Dataset
เพื่อแทนอินพุตของตน และต่อมาใช้ API ที่กล่าวถึงข้างต้นเพื่อแจกจ่ายชุดข้อมูลไปยังอุปกรณ์หลายเครื่อง ในกรณีเช่นนี้ คุณสามารถใช้เมตริกซ์ดิบหรืออินพุตจากเครื่องกำเนิดไฟฟ้าได้
ใช้ Experimental_distribute_values_from_function สำหรับอินพุตเทนเซอร์ตามอำเภอใจ
strategy.run
ยอมรับ tf.distribute.DistributedValues
ซึ่งเป็นผลลัพธ์ของ next(iterator)
ในการส่งผ่านค่าเทนเซอร์ ให้ใช้ experimental_distribute_values_from_function
เพื่อสร้าง tf.distribute.DistributedValues
จากเมตริกซ์ดิบ
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)ตัวยึดตำแหน่ง23
ใช้ tf.data.Dataset.from_generator หากอินพุตของคุณมาจากตัวสร้าง
หากคุณมีฟังก์ชันตัวสร้างที่คุณต้องการใช้ คุณสามารถสร้างอินสแตนซ์ tf.data.Dataset
โดยใช้ from_generator
API
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.