TensorFlow.org पर देखें | Google Colab में चलाएं | GitHub पर स्रोत देखें | नोटबुक डाउनलोड करें |
अवलोकन
GPU और TPU एकल प्रशिक्षण चरण को निष्पादित करने के लिए आवश्यक समय को मौलिक रूप से कम कर सकते हैं। चरम प्रदर्शन प्राप्त करने के लिए एक कुशल इनपुट पाइपलाइन की आवश्यकता होती है जो वर्तमान चरण के समाप्त होने से पहले अगले चरण के लिए डेटा वितरित करती है। tf.data
API लचीली और कुशल इनपुट पाइपलाइन बनाने में मदद करता है। यह दस्तावेज़ दर्शाता है कि अत्यधिक प्रदर्शन करने वाली TensorFlow इनपुट पाइपलाइन बनाने के लिए tf.data
API का उपयोग कैसे करें।
जारी रखने से पहले, tf.data
API का उपयोग करने का तरीका जानने के लिए बिल्ड TensorFlow इनपुट पाइपलाइन गाइड देखें।
साधन
- TensorFlow इनपुट पाइपलाइन बनाएँ
-
tf.data.Dataset
API - TF प्रोफाइलर के साथ
tf.data
प्रदर्शन का विश्लेषण करें
सेट अप
import tensorflow as tf
import time
इस पूरी गाइड में, आप एक डेटासेट में पुनरावृति करेंगे और प्रदर्शन को मापेंगे। प्रतिलिपि प्रस्तुत करने योग्य प्रदर्शन बेंचमार्क बनाना मुश्किल हो सकता है। प्रजनन क्षमता को प्रभावित करने वाले विभिन्न कारकों में शामिल हैं:
- वर्तमान सीपीयू लोड
- नेटवर्क यातायात
- जटिल तंत्र, जैसे कैश
एक प्रतिलिपि प्रस्तुत करने योग्य बेंचमार्क प्राप्त करने के लिए, आप एक कृत्रिम उदाहरण का निर्माण करेंगे।
डेटासेट
ArtificialDataset
नामक tf.data.Dataset
से विरासत में मिली कक्षा को परिभाषित करने के साथ प्रारंभ करें। यह डेटासेट:
-
num_samples
नमूने उत्पन्न करता है (डिफ़ॉल्ट 3 है) - फ़ाइल खोलने के अनुकरण के लिए पहले आइटम से पहले कुछ समय के लिए सो जाता है
- फ़ाइल से डेटा पढ़ने का अनुकरण करने के लिए प्रत्येक आइटम को बनाने से पहले कुछ समय के लिए सो जाता है
class ArtificialDataset(tf.data.Dataset):
def _generator(num_samples):
# Opening the file
time.sleep(0.03)
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
time.sleep(0.015)
yield (sample_idx,)
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
args=(num_samples,)
)
यह डेटासेट tf.data.Dataset.range
एक के समान है, जो प्रत्येक नमूने की शुरुआत और बीच में एक निश्चित विलंब जोड़ता है।
ट्रेनिंग लूप
इसके बाद, एक डमी ट्रेनिंग लूप लिखें जो मापता है कि किसी डेटासेट पर पुनरावृति करने में कितना समय लगता है। प्रशिक्षण का समय सिम्युलेटेड है।
def benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
for sample in dataset:
# Performing a training step
time.sleep(0.01)
print("Execution time:", time.perf_counter() - start_time)
प्रदर्शन का अनुकूलन करें
यह प्रदर्शित करने के लिए कि प्रदर्शन को कैसे अनुकूलित किया जा सकता है, आप ArtificialDataset
के प्रदर्शन में सुधार करेंगे।
भोला दृष्टिकोण
बिना किसी तरकीब के एक भोले-भाले पाइपलाइन से शुरू करें, जैसा कि है, डेटासेट पर पुनरावृति।
benchmark(ArtificialDataset())
Execution time: 0.26497629899995445
हुड के तहत, आपका निष्पादन समय इस प्रकार व्यतीत हुआ:
कथानक से पता चलता है कि एक प्रशिक्षण चरण का प्रदर्शन करना शामिल है:
- फ़ाइल खोलना यदि वह अभी तक नहीं खोली गई है
- फ़ाइल से डेटा प्रविष्टि लाई जा रही है
- प्रशिक्षण के लिए डेटा का उपयोग करना
हालाँकि, यहाँ जैसे सरल तुल्यकालिक कार्यान्वयन में, जबकि आपकी पाइपलाइन डेटा ला रही है, आपका मॉडल बेकार बैठा है। इसके विपरीत, जब आपका मॉडल प्रशिक्षण ले रहा होता है, तो इनपुट पाइपलाइन बेकार बैठी होती है। प्रशिक्षण चरण समय इस प्रकार उद्घाटन, पढ़ने और प्रशिक्षण समय का योग है।
अगले खंड इस इनपुट पाइपलाइन का निर्माण करते हैं, जो प्रदर्शनकारी TensorFlow इनपुट पाइपलाइनों को डिजाइन करने के सर्वोत्तम अभ्यासों को दर्शाता है।
प्रीफेचिंग
प्रीफ़ेचिंग प्रशिक्षण चरण के प्रीप्रोसेसिंग और मॉडल निष्पादन को ओवरलैप करता है। जबकि मॉडल प्रशिक्षण चरण s
निष्पादित कर रहा है, इनपुट पाइपलाइन चरण s+1
के लिए डेटा पढ़ रही है। ऐसा करने से चरण का समय प्रशिक्षण के अधिकतम (योग के विपरीत) और डेटा निकालने में लगने वाले समय को कम कर देता है।
tf.data
API tf.data.Dataset.prefetch
परिवर्तन प्रदान करता है। इसका उपयोग उस समय को कम करने के लिए किया जा सकता है जब डेटा की खपत के समय से डेटा का उत्पादन किया जाता है। विशेष रूप से, परिवर्तन अनुरोधित समय से पहले इनपुट डेटासेट से तत्वों को प्रीफ़ेच करने के लिए पृष्ठभूमि थ्रेड और आंतरिक बफर का उपयोग करता है। प्रीफ़ेच करने के लिए तत्वों की संख्या एकल प्रशिक्षण चरण द्वारा खपत किए गए बैचों की संख्या के बराबर (या संभवतः इससे अधिक) होनी चाहिए। आप या तो इस मान को मैन्युअल रूप से ट्यून कर सकते हैं, या इसे tf.data.AUTOTUNE
पर सेट कर सकते हैं, जो tf.data
रनटाइम को रनटाइम पर गतिशील रूप से मान को ट्यून करने के लिए प्रेरित करेगा।
ध्यान दें कि प्रीफेच परिवर्तन किसी भी समय "उपभोक्ता" के काम के साथ "निर्माता" के काम को ओवरलैप करने का अवसर प्रदान करता है।
benchmark(
ArtificialDataset()
.prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357
अब, जैसा कि डेटा निष्पादन समय प्लॉट दिखाता है, जबकि नमूना 0 के लिए प्रशिक्षण चरण चल रहा है, इनपुट पाइपलाइन नमूना 1 के लिए डेटा पढ़ रही है, और इसी तरह।
समानांतर डेटा निष्कर्षण
वास्तविक दुनिया की सेटिंग में, इनपुट डेटा को दूरस्थ रूप से संग्रहीत किया जा सकता है (उदाहरण के लिए, Google क्लाउड स्टोरेज या एचडीएफएस पर)। एक डेटासेट पाइपलाइन जो स्थानीय रूप से डेटा पढ़ते समय अच्छी तरह से काम करती है, स्थानीय और दूरस्थ भंडारण के बीच निम्नलिखित अंतरों के कारण दूरस्थ रूप से डेटा पढ़ते समय I/O पर बाधा बन सकती है:
- टाइम-टू-फर्स्ट-बाइट : रिमोट स्टोरेज से फाइल की पहली बाइट को पढ़ने से स्थानीय स्टोरेज की तुलना में अधिक परिमाण के ऑर्डर लग सकते हैं।
- रीड थ्रूपुट : जबकि रिमोट स्टोरेज आमतौर पर बड़ी कुल बैंडविड्थ प्रदान करता है, एक फ़ाइल को पढ़ने से इस बैंडविड्थ के केवल एक छोटे से हिस्से का उपयोग करने में सक्षम हो सकता है।
इसके अलावा, एक बार जब रॉ बाइट्स को मेमोरी में लोड कर दिया जाता है, तो डेटा को डीसेरियलाइज़ करना और/या डिक्रिप्ट करना भी आवश्यक हो सकता है (जैसे प्रोटोबफ ), जिसके लिए अतिरिक्त गणना की आवश्यकता होती है। यह ओवरहेड मौजूद है चाहे डेटा स्थानीय रूप से या दूरस्थ रूप से संग्रहीत किया गया हो, लेकिन यदि डेटा को प्रभावी ढंग से प्रीफ़ेच नहीं किया जाता है तो दूरस्थ मामले में और भी खराब हो सकता है।
विभिन्न डेटा निष्कर्षण ओवरहेड्स के प्रभाव को कम करने के लिए, tf.data.Dataset.interleave
परिवर्तन का उपयोग डेटा लोडिंग चरण को समानांतर करने के लिए किया जा सकता है, अन्य डेटासेट (जैसे डेटा फ़ाइल रीडर) की सामग्री को इंटरलीव किया जा सकता है। ओवरलैप करने के लिए डेटासेट की संख्या cycle_length
तर्क द्वारा निर्दिष्ट की जा सकती है, जबकि समांतरता का स्तर num_parallel_calls
तर्क द्वारा निर्दिष्ट किया जा सकता है। prefetch
ट्रांसफ़ॉर्मेशन के समान, interleave
ट्रांसफ़ॉर्मेशन tf.data.AUTOTUNE
का समर्थन करता है, जो tf.data
रनटाइम के लिए किस स्तर की समानता का उपयोग करने के बारे में निर्णय को सौंपेगा।
अनुक्रमिक इंटरलीव
tf.data.Dataset.interleave
परिवर्तन के डिफ़ॉल्ट तर्क इसे क्रमिक रूप से दो डेटासेट से एकल नमूने इंटरलीव करते हैं।
benchmark(
tf.data.Dataset.range(2)
.interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098
यह डेटा निष्पादन समय प्लॉट interleave
परिवर्तन के व्यवहार को प्रदर्शित करने की अनुमति देता है, उपलब्ध दो डेटासेट से वैकल्पिक रूप से नमूने प्राप्त करता है। हालांकि, यहां कोई प्रदर्शन सुधार शामिल नहीं है।
समानांतर इंटरलीव
अब, interleave
रूपांतरण के num_parallel_calls
तर्क का उपयोग करें। यह समानांतर में कई डेटासेट लोड करता है, जिससे फाइलों के खुलने की प्रतीक्षा में लगने वाला समय कम हो जाता है।
benchmark(
tf.data.Dataset.range(2)
.interleave(
lambda _: ArtificialDataset(),
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.283668874000341
इस बार, जैसा कि डेटा निष्पादन समय प्लॉट दिखाता है, दो डेटासेट की रीडिंग समानांतर है, जिससे वैश्विक डेटा प्रोसेसिंग समय कम हो जाता है।
समानांतर डेटा परिवर्तन
डेटा तैयार करते समय, इनपुट तत्वों को पूर्व-संसाधित करने की आवश्यकता हो सकती है। इसके लिए, tf.data
API tf.data.Dataset.map
रूपांतरण प्रदान करता है, जो इनपुट डेटासेट के प्रत्येक तत्व के लिए उपयोगकर्ता द्वारा परिभाषित फ़ंक्शन को लागू करता है। चूंकि इनपुट तत्व एक दूसरे से स्वतंत्र होते हैं, इसलिए प्री-प्रोसेसिंग को कई सीपीयू कोर में समानांतर किया जा सकता है। इसे संभव बनाने के लिए, prefetch
और interleave
ट्रांसफॉर्मेशन के समान, map
ट्रांसफॉर्मेशन समांतरता के स्तर को निर्दिष्ट करने के लिए num_parallel_calls
तर्क प्रदान करता है।
num_parallel_calls
तर्क के लिए सर्वोत्तम मान चुनना आपके हार्डवेयर, आपके प्रशिक्षण डेटा की विशेषताओं (जैसे इसका आकार और आकार), आपके मानचित्र फ़ंक्शन की लागत, और सीपीयू पर एक ही समय में अन्य प्रसंस्करण क्या हो रहा है, पर निर्भर करता है। उपलब्ध सीपीयू कोर की संख्या का उपयोग करने के लिए एक साधारण अनुमानी है। हालांकि, जहां तक prefetch
और interleave
ट्रांसफ़ॉर्मेशन का सवाल है, map
ट्रांसफ़ॉर्मेशन tf.data.AUTOTUNE
को सपोर्ट करता है, जो tf.data
रनटाइम के लिए किस स्तर की समानता का उपयोग करने के बारे में निर्णय देगा।
def mapped_function(s):
# Do some hard pre-processing
tf.py_function(lambda: time.sleep(0.03), [], ())
return s
अनुक्रमिक मानचित्रण
आधारभूत उदाहरण के रूप में समानता के बिना map
परिवर्तन का उपयोग करके प्रारंभ करें।
benchmark(
ArtificialDataset()
.map(mapped_function)
)
Execution time: 0.4505277170001136
भोले दृष्टिकोण के लिए, यहाँ, जैसा कि कथानक दिखाता है, खोलने, पढ़ने, पूर्व-प्रसंस्करण (मानचित्रण) और प्रशिक्षण चरणों के लिए खर्च किए गए समय एक ही पुनरावृत्ति के लिए एक साथ योग करते हैं।
समानांतर मानचित्रण
अब, समान प्री-प्रोसेसिंग फ़ंक्शन का उपयोग करें लेकिन इसे कई नमूनों पर समानांतर में लागू करें।
benchmark(
ArtificialDataset()
.map(
mapped_function,
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.2839677860001757
जैसा कि डेटा प्लॉट प्रदर्शित करता है, पूर्व-प्रसंस्करण चरण ओवरलैप होते हैं, जिससे एकल पुनरावृत्ति के लिए समग्र समय कम हो जाता है।
कैशिंग
tf.data.Dataset.cache
ट्रांसफ़ॉर्मेशन किसी डेटासेट को मेमोरी में या स्थानीय स्टोरेज पर कैश कर सकता है। यह प्रत्येक युग के दौरान निष्पादित होने से कुछ संचालन (जैसे फ़ाइल खोलने और डेटा पढ़ने) को बचाएगा।
benchmark(
ArtificialDataset()
.map( # Apply time consuming operations before cache
mapped_function
).cache(
),
5
)
Execution time: 0.3848854380003104
यहां, डेटा निष्पादन समय प्लॉट से पता चलता है कि जब आप किसी डेटासेट को कैश करते हैं, तो cache
एक से पहले के परिवर्तन (जैसे फ़ाइल खोलना और डेटा पढ़ना) केवल पहले युग के दौरान निष्पादित होते हैं। अगले युग cache
परिवर्तन द्वारा कैश किए गए डेटा का पुन: उपयोग करेंगे।
यदि map
परिवर्तन में पारित उपयोगकर्ता-परिभाषित फ़ंक्शन महंगा है, तो map
परिवर्तन के बाद cache
परिवर्तन को तब तक लागू करें जब तक कि परिणामी डेटासेट अभी भी मेमोरी या स्थानीय भंडारण में फिट हो सके। यदि उपयोगकर्ता-परिभाषित फ़ंक्शन कैश क्षमता से परे डेटासेट को संग्रहीत करने के लिए आवश्यक स्थान को बढ़ाता है, तो या तो इसे cache
परिवर्तन के बाद लागू करें या संसाधन उपयोग को कम करने के लिए अपने प्रशिक्षण कार्य से पहले अपने डेटा को पूर्व-संसाधित करने पर विचार करें।
वेक्टराइजिंग मैपिंग
map
परिवर्तन में पारित उपयोगकर्ता-परिभाषित फ़ंक्शन को लागू करने से उपयोगकर्ता द्वारा परिभाषित फ़ंक्शन को शेड्यूल करने और निष्पादित करने से संबंधित ओवरहेड होता है। उपयोगकर्ता-परिभाषित फ़ंक्शन को वेक्टराइज़ करें (अर्थात, क्या यह एक बार में इनपुट के एक बैच पर संचालित होता है) और map
परिवर्तन से पहले batch
परिवर्तन लागू करें।
इस अच्छे अभ्यास को स्पष्ट करने के लिए, आपका कृत्रिम डेटासेट उपयुक्त नहीं है। शेड्यूलिंग विलंब लगभग 10 माइक्रोसेकंड (10e-6 सेकंड) है, जो ArtificialDataset
में उपयोग किए गए दसियों मिलीसेकंड से बहुत कम है, और इस प्रकार इसका प्रभाव देखना मुश्किल है।
इस उदाहरण के लिए, आधार tf.data.Dataset.range
फ़ंक्शन का उपयोग करें और प्रशिक्षण लूप को उसके सरलतम रूप में सरल बनाएं।
fast_dataset = tf.data.Dataset.range(10000)
def fast_benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for _ in tf.data.Dataset.range(num_epochs):
for _ in dataset:
pass
tf.print("Execution time:", time.perf_counter() - start_time)
def increment(x):
return x+1
अदिश मानचित्रण
fast_benchmark(
fast_dataset
# Apply function one item at a time
.map(increment)
# Batch
.batch(256)
)
Execution time: 0.2712608739998359
ऊपर दिया गया प्लॉट दिखाता है कि स्केलर मैपिंग पद्धति का उपयोग करके क्या हो रहा है (कम नमूनों के साथ)। यह दर्शाता है कि प्रत्येक नमूने के लिए मैप किए गए फ़ंक्शन को लागू किया जाता है। हालांकि यह फ़ंक्शन बहुत तेज़ है, इसमें कुछ ओवरहेड हैं जो समय के प्रदर्शन को प्रभावित करते हैं।
वेक्टरकृत मानचित्रण
fast_benchmark(
fast_dataset
.batch(256)
# Apply function on a batch of items
# The tf.Tensor.__add__ method already handle batches
.map(increment)
)
Execution time: 0.02737950600021577
इस बार, मैप किए गए फ़ंक्शन को एक बार कॉल किया जाता है और नमूने के बैच पर लागू होता है। जैसा कि डेटा निष्पादन समय प्लॉट दिखाता है, जबकि फ़ंक्शन को निष्पादित करने में अधिक समय लग सकता है, ओवरहेड केवल एक बार दिखाई देता है, समग्र समय प्रदर्शन में सुधार करता है।
स्मृति पदचिह्न को कम करना
interleave
, prefetch
और shuffle
सहित कई परिवर्तन, तत्वों के आंतरिक बफर को बनाए रखते हैं। यदि map
परिवर्तन में पारित उपयोगकर्ता-परिभाषित फ़ंक्शन तत्वों के आकार को बदलता है, तो मानचित्र परिवर्तन का क्रम और बफर तत्व स्मृति उपयोग को प्रभावित करते हैं। सामान्य तौर पर, उस क्रम को चुनें जिसके परिणामस्वरूप कम मेमोरी फ़ुटप्रिंट होता है, जब तक कि प्रदर्शन के लिए अलग-अलग ऑर्डरिंग वांछनीय न हो।
कैशिंग आंशिक गणना
map
परिवर्तन के बाद डेटासेट को कैश करने की अनुशंसा की जाती है, सिवाय इसके कि यह परिवर्तन डेटा को स्मृति में फ़िट होने के लिए बहुत बड़ा बना देता है। यदि आपके मैप किए गए फ़ंक्शन को दो भागों में विभाजित किया जा सकता है, तो एक ट्रेड-ऑफ प्राप्त किया जा सकता है: एक समय लेने वाला एक और एक मेमोरी लेने वाला भाग। इस मामले में, आप अपने परिवर्तनों को नीचे की तरह श्रृंखलाबद्ध कर सकते हैं:
dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
इस तरह, समय लेने वाला हिस्सा केवल पहले युग के दौरान ही निष्पादित होता है, और आप बहुत अधिक कैश स्थान का उपयोग करने से बचते हैं।
सर्वोत्तम अभ्यास सारांश
प्रदर्शनकारी TensorFlow इनपुट पाइपलाइनों को डिजाइन करने के लिए सर्वोत्तम अभ्यासों का सारांश यहां दिया गया है:
- निर्माता और उपभोक्ता के काम को ओवरलैप करने के लिए
prefetch
ट्रांसफ़ॉर्मेशन का उपयोग करें -
interleave
ट्रांसफॉर्मेशन का उपयोग करके डेटा रीडिंग ट्रांसफॉर्मेशन को समानांतर करें -
num_parallel_calls
तर्क सेट करकेmap
परिवर्तन को समानांतर करें - पहले युग के दौरान मेमोरी में डेटा को कैश करने के लिए
cache
ट्रांसफ़ॉर्मेशन का उपयोग करें -
map
परिवर्तन में पारित उपयोगकर्ता-परिभाषित कार्यों को वेक्टर करें -
interleave
,prefetch
औरshuffle
ट्रांसफ़ॉर्मेशन लागू करते समय मेमोरी का उपयोग कम करें
आंकड़ों का पुनरुत्पादन
tf.data.Dataset
API समझ में गहराई तक जाने के लिए, आप अपनी खुद की पाइपलाइनों के साथ खेल सकते हैं। नीचे इस गाइड से छवियों को प्लॉट करने के लिए इस्तेमाल किया गया कोड है। सामान्य कठिनाइयों के लिए कुछ समाधान दिखाते हुए यह एक अच्छा प्रारंभिक बिंदु हो सकता है, जैसे:
- निष्पादन समय प्रतिलिपि प्रस्तुत करने योग्यता
- मैप किए गए कार्य उत्सुक निष्पादन
-
interleave
ट्रांसफॉर्मेशन कॉल करने योग्य
import itertools
from collections import defaultdict
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
डेटासेट
ArtificialDataset
के समान आप प्रत्येक चरण में बिताए गए समय को लौटाने वाला डेटासेट बना सकते हैं।
class TimeMeasuredDataset(tf.data.Dataset):
# OUTPUT: (steps, timings, counters)
OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
_INSTANCES_COUNTER = itertools.count() # Number of datasets generated
_EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset
def _generator(instance_idx, num_samples):
epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
# Opening the file
open_enter = time.perf_counter()
time.sleep(0.03)
open_elapsed = time.perf_counter() - open_enter
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
read_enter = time.perf_counter()
time.sleep(0.015)
read_elapsed = time.perf_counter() - read_enter
yield (
[("Open",), ("Read",)],
[(open_enter, open_elapsed), (read_enter, read_elapsed)],
[(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
)
open_enter, open_elapsed = -1., -1. # Negative values will be filtered
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_types=cls.OUTPUT_TYPES,
output_shapes=cls.OUTPUT_SHAPES,
args=(next(cls._INSTANCES_COUNTER), num_samples)
)
यह डेटासेट आकार [[2, 1], [2, 2], [2, 3]]
और प्रकार [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]
के नमूने प्रदान करता है। प्रत्येक नमूना है:
(
[("Open"), ("Read")],
[(t0, d), (t0, d)],
[(i, e, -1), (i, e, s)]
)
कहां:
-
Open
औरRead
चरण पहचानकर्ता हैं -
t0
टाइमस्टैम्प है जब संबंधित चरण शुरू होता है -
d
संबंधित चरण में बिताया गया समय है -
i
उदाहरण सूचकांक है -
e
युग सूचकांक है (डेटासेट को पुनरावृत्त करने की संख्या) -
s
नमूना सूचकांक है
पुनरावृत्ति लूप
सभी समयों को एकत्रित करने के लिए पुनरावृत्ति लूप को थोड़ा और जटिल बनाएं। यह केवल ऊपर बताए अनुसार नमूने जनरेट करने वाले डेटासेट के साथ काम करेगा।
def timelined_benchmark(dataset, num_epochs=2):
# Initialize accumulators
steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
epoch_enter = time.perf_counter()
for (steps, times, values) in dataset:
# Record dataset preparation informations
steps_acc = tf.concat((steps_acc, steps), axis=0)
times_acc = tf.concat((times_acc, times), axis=0)
values_acc = tf.concat((values_acc, values), axis=0)
# Simulate training time
train_enter = time.perf_counter()
time.sleep(0.01)
train_elapsed = time.perf_counter() - train_enter
# Record training informations
steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
epoch_elapsed = time.perf_counter() - epoch_enter
# Record epoch informations
steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
time.sleep(0.001)
tf.print("Execution time:", time.perf_counter() - start_time)
return {"steps": steps_acc, "times": times_acc, "values": values_acc}
प्लॉटिंग विधि
अंत में, timelined_benchmark
फ़ंक्शन द्वारा लौटाए गए मानों को देखते हुए एक टाइमलाइन प्लॉट करने में सक्षम फ़ंक्शन को परिभाषित करें।
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
# Remove invalid entries (negative times, or empty steps) from the timelines
invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
steps = timeline['steps'][invalid_mask].numpy()
times = timeline['times'][invalid_mask].numpy()
values = timeline['values'][invalid_mask].numpy()
# Get a set of different steps, ordered by the first time they are encountered
step_ids, indices = np.stack(np.unique(steps, return_index=True))
step_ids = step_ids[np.argsort(indices)]
# Shift the starting time to 0 and compute the maximal time value
min_time = times[:,0].min()
times[:,0] = (times[:,0] - min_time)
end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
cmap = mpl.cm.get_cmap("plasma")
plt.close()
fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
fig.suptitle(title)
fig.set_size_inches(17.0, len(step_ids))
plt.xlim(-0.01, end)
for i, step in enumerate(step_ids):
step_name = step.decode()
ax = axs[i]
ax.set_ylabel(step_name)
ax.set_ylim(0, 1)
ax.set_yticks([])
ax.set_xlabel("time (s)")
ax.set_xticklabels([])
ax.grid(which="both", axis="x", color="k", linestyle=":")
# Get timings and annotation for the given step
entries_mask = np.squeeze(steps==step)
serie = np.unique(times[entries_mask], axis=0)
annotations = values[entries_mask]
ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
if annotate:
for j, (start, width) in enumerate(serie):
annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
horizontalalignment='left', verticalalignment='center')
if save:
plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")
मैप किए गए फ़ंक्शन के लिए रैपर का इस्तेमाल करें
मैप किए गए फ़ंक्शन को उत्सुक संदर्भ में चलाने के लिए, आपको उन्हें tf.py_function
कॉल के अंदर लपेटना होगा।
def map_decorator(func):
def wrapper(steps, times, values):
# Use a tf.py_function to prevent auto-graph from compiling the method
return tf.py_function(
func,
inp=(steps, times, values),
Tout=(steps.dtype, times.dtype, values.dtype)
)
return wrapper
पाइपलाइन तुलना
_batch_map_num_items = 50
def dataset_generator_fun(*args):
return TimeMeasuredDataset(num_samples=_batch_map_num_items)
अनाड़ी
@map_decorator
def naive_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001) # Time consuming step
time.sleep(0.0001) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, [["Map"]]), axis=0),
tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
tf.concat((values, [values[-1]]), axis=0)
)
naive_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.flat_map(dataset_generator_fun)
.map(naive_map)
.batch(_batch_map_num_items, drop_remainder=True)
.unbatch(),
5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead Execution time: 13.13538893499981प्लेसहोल्डर33
अनुकूलित
@map_decorator
def time_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001 * values.shape[0]) # Time consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
@map_decorator
def memory_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.0001 * values.shape[0]) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
# Use tf.tile to handle batch dimension
return (
tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
optimized_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.interleave( # Parallelize data reading
dataset_generator_fun,
num_parallel_calls=tf.data.AUTOTUNE
)
.batch( # Vectorize your mapped function
_batch_map_num_items,
drop_remainder=True)
.map( # Parallelize map transformation
time_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.cache() # Cache data
.map( # Reduce memory usage
memory_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.prefetch( # Overlap producer and consumer works
tf.data.AUTOTUNE
)
.unbatch(),
5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)