ดูบน TensorFlow.org | ทำงานใน Google Colab | ดูแหล่งที่มาบน GitHub | ดาวน์โหลดโน๊ตบุ๊ค |
ภาพรวม
GPU และ TPU สามารถลดเวลาที่ต้องใช้ในการดำเนินการขั้นตอนการฝึกเพียงขั้นตอนเดียวได้อย่างมาก การบรรลุประสิทธิภาพสูงสุดต้องใช้ไปป์ไลน์อินพุตที่มีประสิทธิภาพซึ่งส่งข้อมูลสำหรับขั้นตอนถัดไปก่อนที่ขั้นตอนปัจจุบันจะเสร็จสิ้น tf.data
API ช่วยสร้างไปป์ไลน์อินพุตที่ยืดหยุ่นและมีประสิทธิภาพ เอกสารนี้สาธิตวิธีใช้ tf.data
API เพื่อสร้างไปป์ไลน์อินพุต TensorFlow ที่มีประสิทธิภาพสูง
ก่อนที่คุณจะดำเนินการต่อ ให้ตรวจสอบคำแนะนำ ไปป์ไลน์อินพุต Build TensorFlow เพื่อเรียนรู้วิธีใช้ tf.data
API
ทรัพยากร
ติดตั้ง
import tensorflow as tf
import time
ในคู่มือนี้ คุณจะทำซ้ำในชุดข้อมูลและวัดประสิทธิภาพ การสร้างมาตรฐานประสิทธิภาพที่ทำซ้ำได้อาจเป็นเรื่องยาก ปัจจัยต่างๆ ที่ส่งผลต่อความสามารถในการทำซ้ำ ได้แก่
- โหลด CPU ปัจจุบัน
- ปริมาณการใช้เครือข่าย
- กลไกที่ซับซ้อน เช่น cache
เพื่อให้ได้เกณฑ์มาตรฐานที่ทำซ้ำได้ คุณจะต้องสร้างตัวอย่างเทียม
ชุดข้อมูล
เริ่มต้นด้วยการกำหนดคลาสที่สืบทอดจาก tf.data.Dataset
เรียกว่า ArtificialDataset
ชุดข้อมูลนี้:
- สร้าง
num_samples
ตัวอย่าง (ค่าเริ่มต้นคือ 3) - พักสักครู่ก่อนรายการแรกเพื่อจำลองการเปิดไฟล์
- พักสักครู่ก่อนที่จะสร้างแต่ละรายการเพื่อจำลองการอ่านข้อมูลจากไฟล์
class ArtificialDataset(tf.data.Dataset):
def _generator(num_samples):
# Opening the file
time.sleep(0.03)
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
time.sleep(0.015)
yield (sample_idx,)
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
args=(num_samples,)
)
ชุดข้อมูลนี้คล้ายกับชุดข้อมูล tf.data.Dataset.range
โดยเพิ่มการหน่วงเวลาคงที่ที่จุดเริ่มต้นและระหว่างแต่ละตัวอย่าง
วงการฝึก
ถัดไป เขียนลูปการฝึกจำลองที่วัดระยะเวลาที่ใช้ในการวนซ้ำชุดข้อมูล เวลาฝึกอบรมเป็นแบบจำลอง
def benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
for sample in dataset:
# Performing a training step
time.sleep(0.01)
print("Execution time:", time.perf_counter() - start_time)
เพิ่มประสิทธิภาพการทำงาน
หากต้องการแสดงวิธีการเพิ่มประสิทธิภาพ คุณจะต้องปรับปรุงประสิทธิภาพของ ArtificialDataset
วิธีการไร้เดียงสา
เริ่มต้นด้วยไปป์ไลน์ที่ไร้เดียงสาโดยไม่ต้องใช้ลูกเล่น วนซ้ำชุดข้อมูลตามที่เป็นอยู่
benchmark(ArtificialDataset())
Execution time: 0.26497629899995445
ภายใต้ประทุน นี่คือวิธีใช้เวลาดำเนินการของคุณ:
โครงเรื่องแสดงให้เห็นว่าการทำขั้นตอนการฝึกอบรมเกี่ยวข้องกับ:
- การเปิดไฟล์หากยังไม่ได้เปิด
- กำลังดึงข้อมูลรายการจากไฟล์
- การใช้ข้อมูลในการฝึกอบรม
อย่างไรก็ตาม ในการใช้งานซิงโครนัสแบบไร้เดียงสาเช่นนี้ ในขณะที่ไปป์ไลน์ของคุณกำลังดึงข้อมูล โมเดลของคุณไม่ได้ใช้งาน ในทางกลับกัน ในขณะที่โมเดลของคุณกำลังฝึก ไปป์ไลน์อินพุตจะไม่ได้ใช้งาน เวลาของขั้นตอนการฝึกอบรมจึงเป็นผลรวมของเวลาเปิด การอ่าน และเวลาฝึกอบรม
ส่วนถัดไปสร้างขึ้นบนไปป์ไลน์อินพุตนี้ ซึ่งแสดงให้เห็นแนวทางปฏิบัติที่ดีที่สุดสำหรับการออกแบบไปป์ไลน์อินพุต TensorFlow ที่มีประสิทธิภาพ
กำลังดึงข้อมูลล่วงหน้า
การดึงข้อมูลล่วงหน้าทับซ้อนการประมวลผลล่วงหน้าและการดำเนินการแบบจำลองของขั้นตอนการฝึกอบรม ขณะที่โมเดลกำลังดำเนินการขั้นตอนการฝึก ไปป์ไลน์อินพุตกำลังอ่านข้อมูลสำหรับขั้นตอน s
s+1
การทำเช่นนี้จะลดเวลาขั้นตอนให้เหลือสูงสุด (เมื่อเทียบกับผลรวม) ของการฝึก และเวลาที่ใช้ในการดึงข้อมูล
tf.data
API จัดเตรียมการแปลง tf.data.Dataset.prefetch
สามารถใช้เพื่อแยกเวลาที่ข้อมูลถูกสร้างขึ้นจากเวลาที่มีการใช้ข้อมูล โดยเฉพาะอย่างยิ่ง การแปลงจะใช้เธรดพื้นหลังและบัฟเฟอร์ภายในเพื่อดึงองค์ประกอบล่วงหน้าจากชุดข้อมูลอินพุตล่วงหน้าก่อนเวลาที่ร้องขอ จำนวนองค์ประกอบที่จะดึงข้อมูลล่วงหน้าควรเท่ากับ (หรืออาจมากกว่า) จำนวนแบทช์ที่ใช้โดยขั้นตอนการฝึกเดียว คุณสามารถปรับแต่งค่านี้ด้วยตนเอง หรือตั้งค่าเป็น tf.data.AUTOTUNE
ซึ่งจะแจ้งรันไทม์ tf.data
เพื่อปรับแต่งค่าแบบไดนามิกที่รันไทม์
โปรดทราบว่าการแปลงการดึงข้อมูลล่วงหน้าให้ประโยชน์ทุกครั้งที่มีโอกาสทับซ้อนงานของ "ผู้ผลิต" กับงานของ "ผู้บริโภค"
benchmark(
ArtificialDataset()
.prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357
ตอนนี้ ตามที่แสดงแผนภาพเวลาการดำเนินการข้อมูล ในขณะที่ขั้นตอนการฝึกอบรมกำลังทำงานสำหรับตัวอย่าง 0 ไปป์ไลน์อินพุตกำลังอ่านข้อมูลสำหรับตัวอย่างที่ 1 เป็นต้น
การแยกข้อมูลแบบขนาน
ในการตั้งค่าจริง ข้อมูลที่ป้อนเข้าอาจถูกจัดเก็บจากระยะไกล (เช่น บน Google Cloud Storage หรือ HDFS) ไปป์ไลน์ชุดข้อมูลที่ทำงานได้ดีเมื่ออ่านข้อมูลในเครื่องอาจกลายเป็นคอขวดบน I/O เมื่ออ่านข้อมูลทางไกลเนื่องจากความแตกต่างระหว่างที่จัดเก็บในเครื่องและระยะไกล:
- Time-to-First-byte : การอ่านไบต์แรกของไฟล์จากที่เก็บข้อมูลระยะไกลอาจใช้เวลานานกว่าที่จัดเก็บในเครื่อง
- ปริมาณการอ่าน : แม้ว่าโดยทั่วไปแล้วที่เก็บข้อมูลระยะไกลจะมีแบนด์วิดท์รวมขนาดใหญ่ การอ่านไฟล์เดียวอาจใช้แบนด์วิดท์นี้ได้เพียงเล็กน้อยเท่านั้น
นอกจากนี้ เมื่อโหลดไบต์ดิบลงในหน่วยความจำแล้ว อาจจำเป็นต้องทำการดีซีเรียลไลซ์และ/หรือถอดรหัสข้อมูล (เช่น protobuf ) ซึ่งต้องมีการคำนวณเพิ่มเติม โอเวอร์เฮดนี้มีขึ้นโดยไม่คำนึงว่าข้อมูลจะถูกเก็บไว้ในเครื่องหรือจากระยะไกล แต่อาจแย่กว่านั้นในกรณีระยะไกลหากข้อมูลไม่ได้รับการดึงข้อมูลล่วงหน้าอย่างมีประสิทธิภาพ
เพื่อลดผลกระทบของโอเวอร์เฮดในการดึงข้อมูลต่างๆ การแปลง tf.data.Dataset.interleave
สามารถใช้เพื่อทำให้ขั้นตอนการโหลดข้อมูลขนานกัน โดยแทรกเนื้อหาของชุดข้อมูลอื่นๆ (เช่น ตัวอ่านไฟล์ข้อมูล) จำนวนชุดข้อมูลที่จะทับซ้อนกันสามารถระบุได้โดยอาร์กิวเมนต์ cycle_length
ในขณะที่ระดับของการขนานสามารถระบุได้โดยอาร์กิวเมนต์ num_parallel_calls
คล้ายกับการแปลง prefetch
การแปลง interleave
รองรับ tf.data.AUTOTUNE
ซึ่งจะมอบหมายการตัดสินใจเกี่ยวกับระดับของการขนานที่จะใช้กับรันไทม์ tf.data
อินเตอร์ลีฟตามลำดับ
อาร์กิวเมนต์เริ่มต้นของการแปลง tf.data.Dataset.interleave
ทำให้สามารถแทรกตัวอย่างเดี่ยวจากชุดข้อมูลสองชุดตามลำดับ
benchmark(
tf.data.Dataset.range(2)
.interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098
พล็อตเวลาดำเนินการข้อมูลนี้ช่วยให้สามารถแสดงพฤติกรรมของการแปลง interleave
ลีฟ โดยดึงตัวอย่างจากชุดข้อมูลสองชุดที่มีอยู่ อย่างไรก็ตาม ไม่มีการปรับปรุงประสิทธิภาพที่เกี่ยวข้องที่นี่
อินเตอร์ลีฟแบบขนาน
ตอนนี้ ใช้อาร์กิวเมนต์ num_parallel_calls
ของการแปลง interleave
ลีฟ ซึ่งจะโหลดชุดข้อมูลหลายชุดพร้อมกัน ช่วยลดเวลาในการรอเปิดไฟล์
benchmark(
tf.data.Dataset.range(2)
.interleave(
lambda _: ArtificialDataset(),
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.283668874000341
คราวนี้ ตามที่แสดงแผนภาพเวลาการดำเนินการข้อมูล การอ่านชุดข้อมูลทั้งสองแบบจะขนานกัน ช่วยลดเวลาในการประมวลผลข้อมูลทั่วโลก
การแปลงข้อมูลแบบขนาน
เมื่อเตรียมข้อมูล องค์ประกอบอินพุตอาจต้องได้รับการประมวลผลล่วงหน้า ด้วยเหตุนี้ tf.data
API จึงเสนอการแปลง tf.data.Dataset.map
ซึ่งใช้ฟังก์ชันที่ผู้ใช้กำหนดเองกับแต่ละองค์ประกอบของชุดข้อมูลอินพุต เนื่องจากองค์ประกอบอินพุตไม่แยกจากกัน การประมวลผลล่วงหน้าจึงสามารถขนานกันข้ามคอร์ของ CPU หลายตัวได้ เพื่อให้เป็นไปได้ เช่นเดียวกับการแปลง prefetch
และการแปลง interleave
ลีฟ การแปลง map
จัดให้มีอาร์กิวเมนต์ num_parallel_calls
เพื่อระบุระดับของการขนานกัน
การเลือกค่าที่ดีที่สุดสำหรับอาร์กิวเมนต์ num_parallel_calls
ขึ้นอยู่กับฮาร์ดแวร์ของคุณ ลักษณะของข้อมูลการฝึกของคุณ (เช่น ขนาดและรูปร่าง) ค่าใช้จ่ายของฟังก์ชันแผนที่ของคุณ และการประมวลผลอื่นๆ ที่เกิดขึ้นบน CPU ในเวลาเดียวกัน ฮิวริสติกอย่างง่ายคือการใช้จำนวนคอร์ของ CPU ที่มีอยู่ อย่างไรก็ตาม สำหรับการแปลง prefetch
และการแปลง interleave
ลีฟ การแปลง map
รองรับ tf.data.AUTOTUNE
ซึ่งจะมอบหมายการตัดสินใจเกี่ยวกับระดับของการขนานที่จะใช้กับรันไทม์ tf.data
def mapped_function(s):
# Do some hard pre-processing
tf.py_function(lambda: time.sleep(0.03), [], ())
return s
การทำแผนที่ตามลำดับ
เริ่มต้นด้วยการใช้การแปลง map
โดยไม่มีความขนานเป็นตัวอย่างพื้นฐาน
benchmark(
ArtificialDataset()
.map(mapped_function)
)
Execution time: 0.4505277170001136
สำหรับ แนวทางไร้เดียงสา ในที่นี้ ตามที่พล็อตแสดง เวลาที่ใช้ในการเปิด การอ่าน การประมวลผลล่วงหน้า (การทำแผนที่) และขั้นตอนการฝึกอบรมรวมกันสำหรับการทำซ้ำครั้งเดียว
การทำแผนที่คู่ขนาน
ตอนนี้ ใช้ฟังก์ชันก่อนการประมวลผลเดียวกันแต่ใช้พร้อมกันกับตัวอย่างหลายรายการ
benchmark(
ArtificialDataset()
.map(
mapped_function,
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.2839677860001757
ตามที่แสดงแผนภาพข้อมูล ขั้นตอนก่อนการประมวลผลจะทับซ้อนกัน ซึ่งช่วยลดเวลาโดยรวมสำหรับการทำซ้ำครั้งเดียว
เก็บเอาไว้
การแปลง tf.data.Dataset.cache
สามารถแคชชุดข้อมูล ทั้งในหน่วยความจำหรือที่จัดเก็บในเครื่อง การดำเนินการนี้จะช่วยประหยัดการดำเนินการบางอย่าง (เช่น การเปิดไฟล์และการอ่านข้อมูล) ไม่ให้ดำเนินการในแต่ละยุค
benchmark(
ArtificialDataset()
.map( # Apply time consuming operations before cache
mapped_function
).cache(
),
5
)
Execution time: 0.3848854380003104
ในที่นี้ พล็อตเวลาดำเนินการข้อมูลแสดงให้เห็นว่าเมื่อคุณแคชชุดข้อมูล การแปลงก่อน cache
(เช่น การเปิดไฟล์และการอ่านข้อมูล) จะดำเนินการในช่วงยุคแรกเท่านั้น ยุคถัดไปจะนำข้อมูลที่แคชไว้โดยการแปลง cache
มาใช้ซ้ำ
หากฟังก์ชันที่ผู้ใช้กำหนดเองที่ส่งผ่านไปยังการแปลง map
มีราคาแพง ให้ใช้การแปลง cache
หลังจากการแปลง map
ตราบเท่าที่ชุดข้อมูลที่เป็นผลลัพธ์ยังสามารถใส่ลงในหน่วยความจำหรือที่จัดเก็บในเครื่องได้ หากฟังก์ชันที่ผู้ใช้กำหนดเองเพิ่มพื้นที่ที่จำเป็นในการจัดเก็บชุดข้อมูลเกินความจุของแคช ให้นำไปใช้หลังการแปลง cache
หรือพิจารณาประมวลผลข้อมูลของคุณล่วงหน้าก่อนงานฝึกอบรมเพื่อลดการใช้ทรัพยากร
การทำแผนที่เวกเตอร์
การเรียกใช้ฟังก์ชันที่ผู้ใช้กำหนดเองที่ส่งผ่านไปยังการแปลง map
มีค่าใช้จ่ายที่เกี่ยวข้องกับการตั้งเวลาและการดำเนินการฟังก์ชันที่ผู้ใช้กำหนดเอง ทำให้ฟังก์ชันที่ผู้ใช้กำหนดเองเป็นภาพเวกเตอร์ (กล่าวคือ ให้ทำงานผ่านชุดอินพุตพร้อมกัน) และใช้การแปลง batch
ก่อน การแปลง map
เพื่อแสดงให้เห็นแนวทางปฏิบัติที่ดีนี้ ชุดข้อมูลเทียมของคุณไม่เหมาะ ความล่าช้าในการจัดกำหนดการอยู่ที่ประมาณ 10 ไมโครวินาที (10e-6 วินาที) ซึ่งน้อยกว่าหลายสิบมิลลิวินาทีที่ใช้ใน ArtificialDataset
มาก ดังนั้นจึงมองเห็นได้ยาก
สำหรับตัวอย่างนี้ ให้ใช้ฟังก์ชันฐาน tf.data.Dataset.range
และทำให้ลูปการฝึกง่ายขึ้นเป็นรูปแบบที่ง่ายที่สุด
fast_dataset = tf.data.Dataset.range(10000)
def fast_benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for _ in tf.data.Dataset.range(num_epochs):
for _ in dataset:
pass
tf.print("Execution time:", time.perf_counter() - start_time)
def increment(x):
return x+1
การทำแผนที่สเกลาร์
fast_benchmark(
fast_dataset
# Apply function one item at a time
.map(increment)
# Batch
.batch(256)
)
Execution time: 0.2712608739998359
พล็อตด้านบนแสดงให้เห็นสิ่งที่เกิดขึ้น (โดยมีตัวอย่างน้อยกว่า) โดยใช้วิธีการแมปสเกลาร์ แสดงว่ามีการใช้ฟังก์ชันที่แมปสำหรับแต่ละตัวอย่าง แม้ว่าฟังก์ชันนี้จะเร็วมาก แต่ก็มีค่าใช้จ่ายบางอย่างที่ส่งผลต่อประสิทธิภาพของเวลา
การทำแผนที่แบบเวกเตอร์
fast_benchmark(
fast_dataset
.batch(256)
# Apply function on a batch of items
# The tf.Tensor.__add__ method already handle batches
.map(increment)
)
Execution time: 0.02737950600021577ตัวยึดตำแหน่ง23
คราวนี้ ฟังก์ชันที่แมปถูกเรียกหนึ่งครั้งและนำไปใช้กับกลุ่มตัวอย่าง ตามที่พล็อตเวลาดำเนินการข้อมูลแสดง ในขณะที่ฟังก์ชันอาจใช้เวลาในการดำเนินการมากกว่า โอเวอร์เฮดจะปรากฏขึ้นเพียงครั้งเดียว ซึ่งช่วยปรับปรุงประสิทธิภาพของเวลาโดยรวม
ลดรอยเท้าหน่วยความจำ
การแปลงจำนวนหนึ่ง รวมถึง interleave
, prefetch
และ shuffle
จะรักษาบัฟเฟอร์ภายในขององค์ประกอบ หากฟังก์ชันที่ผู้ใช้กำหนดเองส่งผ่านไปยังการแปลง map
เปลี่ยนขนาดขององค์ประกอบ ลำดับของการแปลงแผนที่และการแปลงที่องค์ประกอบบัฟเฟอร์จะส่งผลต่อการใช้หน่วยความจำ โดยทั่วไป เลือกลำดับที่ส่งผลให้พื้นที่หน่วยความจำลดลง เว้นแต่จะต้องการลำดับที่แตกต่างกันสำหรับประสิทธิภาพ
การคำนวณแคชบางส่วน
ขอแนะนำให้แคชชุดข้อมูลหลังการแปลง map
ยกเว้นในกรณีที่การแปลงนี้ทำให้ข้อมูลมีขนาดใหญ่เกินกว่าจะใส่ลงในหน่วยความจำได้ การแลกเปลี่ยนสามารถทำได้หากฟังก์ชั่นที่แมปของคุณแบ่งออกเป็นสองส่วน: ส่วนที่กินเวลาและส่วนที่ใช้หน่วยความจำ ในกรณีนี้ คุณสามารถโยงการเปลี่ยนแปลงของคุณได้ดังนี้:
dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
ด้วยวิธีนี้ ส่วนที่ใช้เวลานานจะถูกดำเนินการในช่วงยุคแรกเท่านั้น และคุณหลีกเลี่ยงการใช้พื้นที่แคชมากเกินไป
สรุปแนวปฏิบัติที่ดีที่สุด
นี่คือบทสรุปของแนวทางปฏิบัติที่ดีที่สุดสำหรับการออกแบบไปป์ไลน์อินพุต TensorFlow ที่มีประสิทธิภาพ:
- ใช้การแปลงค่า
prefetch
เพื่อทับซ้อนงานของผู้ผลิตและผู้บริโภค - ทำการแปลงการอ่านข้อมูลแบบขนาน โดยใช้การแปลง
interleave
- ทำให้การแปลง
map
ขนานกัน โดยการตั้งค่าอาร์กิวเมนต์num_parallel_calls
- ใช้การแปลง
cache
เพื่อแคชข้อมูลในหน่วยความจำในช่วงยุคแรก - Vectorize ฟังก์ชั่นที่ผู้ใช้กำหนด ส่งผ่านไปยังการแปลง
map
- ลดการใช้หน่วยความจำ เมื่อใช้การแปลง
interleave
,prefetch
และshuffle
การจำลองตัวเลข
หากต้องการทำความเข้าใจ tf.data.Dataset
API ให้ลึกซึ้งยิ่งขึ้น คุณสามารถเล่นกับไปป์ไลน์ของคุณเองได้ ด้านล่างนี้คือรหัสที่ใช้ในการพล็อตรูปภาพจากคู่มือนี้ อาจเป็นจุดเริ่มต้นที่ดี โดยแสดงวิธีแก้ไขปัญหาเฉพาะหน้าสำหรับปัญหาทั่วไป เช่น:
- ความสามารถในการทำซ้ำเวลาดำเนินการ
- ฟังก์ชั่นที่แมปดำเนินการอย่างกระตือรือร้น
- การแปลง
interleave
callable
import itertools
from collections import defaultdict
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
ชุดข้อมูล
คล้ายกับ ArtificialDataset
คุณสามารถสร้างชุดข้อมูลที่ส่งคืนเวลาที่ใช้ในแต่ละขั้นตอน
class TimeMeasuredDataset(tf.data.Dataset):
# OUTPUT: (steps, timings, counters)
OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
_INSTANCES_COUNTER = itertools.count() # Number of datasets generated
_EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset
def _generator(instance_idx, num_samples):
epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
# Opening the file
open_enter = time.perf_counter()
time.sleep(0.03)
open_elapsed = time.perf_counter() - open_enter
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
read_enter = time.perf_counter()
time.sleep(0.015)
read_elapsed = time.perf_counter() - read_enter
yield (
[("Open",), ("Read",)],
[(open_enter, open_elapsed), (read_enter, read_elapsed)],
[(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
)
open_enter, open_elapsed = -1., -1. # Negative values will be filtered
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_types=cls.OUTPUT_TYPES,
output_shapes=cls.OUTPUT_SHAPES,
args=(next(cls._INSTANCES_COUNTER), num_samples)
)
ชุดข้อมูลนี้แสดงตัวอย่างรูปร่าง [[2, 1], [2, 2], [2, 3]]
และประเภท [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]
แต่ละตัวอย่างคือ:
(
[("Open"), ("Read")],
[(t0, d), (t0, d)],
[(i, e, -1), (i, e, s)]
)
ที่ไหน:
-
Open
และRead
เป็นตัวระบุขั้นตอน -
t0
คือการประทับเวลาเมื่อขั้นตอนที่เกี่ยวข้องเริ่มต้นขึ้น -
d
คือเวลาที่ใช้ในขั้นตอนที่เกี่ยวข้อง -
i
เป็นดัชนีตัวอย่าง -
e
คือดัชนียุค (จำนวนครั้งที่มีการทำซ้ำชุดข้อมูล) -
s
คือดัชนีตัวอย่าง
วงวนซ้ำ
ทำให้การวนซ้ำซับซ้อนขึ้นเล็กน้อยเพื่อรวมการกำหนดเวลาทั้งหมด สิ่งนี้จะใช้ได้กับชุดข้อมูลที่สร้างตัวอย่างตามรายละเอียดด้านบนเท่านั้น
def timelined_benchmark(dataset, num_epochs=2):
# Initialize accumulators
steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
epoch_enter = time.perf_counter()
for (steps, times, values) in dataset:
# Record dataset preparation informations
steps_acc = tf.concat((steps_acc, steps), axis=0)
times_acc = tf.concat((times_acc, times), axis=0)
values_acc = tf.concat((values_acc, values), axis=0)
# Simulate training time
train_enter = time.perf_counter()
time.sleep(0.01)
train_elapsed = time.perf_counter() - train_enter
# Record training informations
steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
epoch_elapsed = time.perf_counter() - epoch_enter
# Record epoch informations
steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
time.sleep(0.001)
tf.print("Execution time:", time.perf_counter() - start_time)
return {"steps": steps_acc, "times": times_acc, "values": values_acc}
วิธีการวางแผน
สุดท้าย ให้กำหนดฟังก์ชันที่สามารถพล็อตไทม์ไลน์โดยให้ค่าที่ส่งคืนโดยฟังก์ชัน timelined_benchmark
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
# Remove invalid entries (negative times, or empty steps) from the timelines
invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
steps = timeline['steps'][invalid_mask].numpy()
times = timeline['times'][invalid_mask].numpy()
values = timeline['values'][invalid_mask].numpy()
# Get a set of different steps, ordered by the first time they are encountered
step_ids, indices = np.stack(np.unique(steps, return_index=True))
step_ids = step_ids[np.argsort(indices)]
# Shift the starting time to 0 and compute the maximal time value
min_time = times[:,0].min()
times[:,0] = (times[:,0] - min_time)
end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
cmap = mpl.cm.get_cmap("plasma")
plt.close()
fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
fig.suptitle(title)
fig.set_size_inches(17.0, len(step_ids))
plt.xlim(-0.01, end)
for i, step in enumerate(step_ids):
step_name = step.decode()
ax = axs[i]
ax.set_ylabel(step_name)
ax.set_ylim(0, 1)
ax.set_yticks([])
ax.set_xlabel("time (s)")
ax.set_xticklabels([])
ax.grid(which="both", axis="x", color="k", linestyle=":")
# Get timings and annotation for the given step
entries_mask = np.squeeze(steps==step)
serie = np.unique(times[entries_mask], axis=0)
annotations = values[entries_mask]
ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
if annotate:
for j, (start, width) in enumerate(serie):
annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
horizontalalignment='left', verticalalignment='center')
if save:
plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")
ใช้ wrappers สำหรับฟังก์ชันที่แมป
ในการเรียกใช้ฟังก์ชันที่แมปในบริบทที่ต้องการ คุณต้องรวมไว้ในการเรียก tf.py_function
def map_decorator(func):
def wrapper(steps, times, values):
# Use a tf.py_function to prevent auto-graph from compiling the method
return tf.py_function(
func,
inp=(steps, times, values),
Tout=(steps.dtype, times.dtype, values.dtype)
)
return wrapper
การเปรียบเทียบท่อ
_batch_map_num_items = 50
def dataset_generator_fun(*args):
return TimeMeasuredDataset(num_samples=_batch_map_num_items)
ไร้เดียงสา
@map_decorator
def naive_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001) # Time consuming step
time.sleep(0.0001) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, [["Map"]]), axis=0),
tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
tf.concat((values, [values[-1]]), axis=0)
)
naive_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.flat_map(dataset_generator_fun)
.map(naive_map)
.batch(_batch_map_num_items, drop_remainder=True)
.unbatch(),
5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead Execution time: 13.13538893499981ตัวยึดตำแหน่ง33
เพิ่มประสิทธิภาพ
@map_decorator
def time_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001 * values.shape[0]) # Time consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
@map_decorator
def memory_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.0001 * values.shape[0]) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
# Use tf.tile to handle batch dimension
return (
tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
optimized_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.interleave( # Parallelize data reading
dataset_generator_fun,
num_parallel_calls=tf.data.AUTOTUNE
)
.batch( # Vectorize your mapped function
_batch_map_num_items,
drop_remainder=True)
.map( # Parallelize map transformation
time_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.cache() # Cache data
.map( # Reduce memory usage
memory_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.prefetch( # Overlap producer and consumer works
tf.data.AUTOTUNE
)
.unbatch(),
5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)