عرض على TensorFlow.org | تشغيل في Google Colab | عرض المصدر على جيثب | تحميل دفتر |
ملخص
يمكن لوحدات معالجة الرسومات (GPU) و (TPU) تقليل الوقت المطلوب بشكل جذري لتنفيذ خطوة تدريب واحدة. يتطلب تحقيق ذروة الأداء وجود خط إدخال فعال يوفر البيانات للخطوة التالية قبل انتهاء الخطوة الحالية. تساعد واجهة برمجة تطبيقات tf.data
في بناء خطوط إدخال مرنة وفعالة. يوضح هذا المستند كيفية استخدام tf.data
API لبناء خطوط أنابيب إدخال TensorFlow عالية الأداء.
قبل المتابعة ، راجع دليل إنشاء خطوط إدخال TensorFlow للتعرف على كيفية استخدام tf.data
API.
موارد
يثبت
import tensorflow as tf
import time
خلال هذا الدليل ، سوف تتكرر عبر مجموعة بيانات وتقيس الأداء. قد يكون من الصعب إجراء مقاييس أداء قابلة للتكرار. تشمل العوامل المختلفة التي تؤثر على قابلية التكاثر ما يلي:
- الحمل الحالي لوحدة المعالجة المركزية
- حركة مرور الشبكة
- آليات معقدة ، مثل ذاكرة التخزين المؤقت
للحصول على معيار قابل للتكرار ، ستقوم ببناء مثال مصطنع.
مجموعة البيانات
ابدأ بتحديد فئة موروثة من tf.data.Dataset
تسمى ArtificialDataset
. مجموعة البيانات هذه:
- يولد
num_samples
العينات (الافتراضي هو 3) - ينام لبعض الوقت قبل العنصر الأول لمحاكاة فتح ملف
- ينام لبعض الوقت قبل إنتاج كل عنصر لمحاكاة قراءة البيانات من ملف
class ArtificialDataset(tf.data.Dataset):
def _generator(num_samples):
# Opening the file
time.sleep(0.03)
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
time.sleep(0.015)
yield (sample_idx,)
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
args=(num_samples,)
)
تشبه مجموعة البيانات هذه tf.data.Dataset.range
الأول ، مضيفًا تأخيرًا ثابتًا في بداية كل عينة وفيما بينها.
حلقة التدريب
بعد ذلك ، اكتب حلقة تدريب وهمية تقيس الوقت المستغرق للتكرار على مجموعة بيانات. يتم محاكاة وقت التدريب.
def benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
for sample in dataset:
# Performing a training step
time.sleep(0.01)
print("Execution time:", time.perf_counter() - start_time)
تحسين الأداء
لعرض كيفية تحسين الأداء ، ستقوم بتحسين أداء مجموعة البيانات ArtificialDataset
.
النهج الساذج
ابدأ بخط أنابيب ساذج بدون حيل ، وقم بالتكرار على مجموعة البيانات كما هي.
benchmark(ArtificialDataset())
Execution time: 0.26497629899995445
تحت الغطاء ، هذه هي الطريقة التي أمضيت بها وقت التنفيذ:
توضح الحبكة أن تنفيذ خطوة تدريبية تتضمن:
- فتح ملف إذا لم يتم فتحه بعد
- إحضار إدخال بيانات من الملف
- استخدام البيانات للتدريب
ومع ذلك ، في تنفيذ متزامن ساذج مثل هنا ، بينما يقوم خط الأنابيب الخاص بك بجلب البيانات ، يكون نموذجك في وضع الخمول. على العكس من ذلك ، أثناء تدريب النموذج الخاص بك ، يكون خط أنابيب الإدخال في وضع الخمول. وبالتالي فإن وقت خطوة التدريب هو مجموع أوقات الفتح والقراءة والتدريب.
تعتمد الأقسام التالية على خط أنابيب الإدخال هذا ، مما يوضح أفضل الممارسات لتصميم أنابيب إدخال TensorFlow ذات الأداء العالي.
الجلب المسبق
يتداخل الجلب المسبق مع المعالجة المسبقة وتنفيذ النموذج لخطوة التدريب. أثناء تنفيذ النموذج لخطوات التدريب ، يقرأ خط أنابيب الإدخال البيانات للخطوة s
s+1
. يؤدي القيام بذلك إلى تقليل وقت الخطوة إلى الحد الأقصى (على عكس المجموع) للتدريب والوقت الذي يستغرقه استخراج البيانات.
توفر واجهة برمجة تطبيقات tf.data
تحويل tf.data.Dataset.prefetch
. يمكن استخدامه لفصل الوقت الذي يتم فيه إنتاج البيانات عن وقت استهلاك البيانات. على وجه الخصوص ، يستخدم التحويل مؤشر ترابط في الخلفية ومخزنًا مؤقتًا داخليًا للجلب المسبق للعناصر من مجموعة بيانات الإدخال قبل الوقت المطلوب. يجب أن يكون عدد العناصر المطلوب الجلب المسبق مساويًا (أو ربما أكبر من) عدد الدُفعات المستهلكة في خطوة تدريب واحدة. يمكنك إما ضبط هذه القيمة يدويًا ، أو تعيينها على tf.data.AUTOTUNE
، والذي سيطلب من وقت تشغيل tf.data
ضبط القيمة ديناميكيًا في وقت التشغيل.
لاحظ أن تحويل الجلب المسبق يوفر فوائد في أي وقت توجد فيه فرصة لتداخل عمل "المنتج" مع عمل "المستهلك".
benchmark(
ArtificialDataset()
.prefetch(tf.data.AUTOTUNE)
)
Execution time: 0.21731788600027357
الآن ، كما يظهر مخطط وقت تنفيذ البيانات ، أثناء تشغيل خطوة التدريب للعينة 0 ، يقرأ خط أنابيب الإدخال البيانات الخاصة بالعينة 1 ، وما إلى ذلك.
موازاة استخراج البيانات
في إعدادات العالم الحقيقي ، قد يتم تخزين بيانات الإدخال عن بُعد (على سبيل المثال ، على Google Cloud Storage أو HDFS). قد يتعرض خط أنابيب مجموعة البيانات الذي يعمل بشكل جيد عند قراءة البيانات محليًا للاختناق في الإدخال / الإخراج عند قراءة البيانات عن بُعد بسبب الاختلافات التالية بين التخزين المحلي والتخزين البعيد:
- الوقت إلى البايت الأول : يمكن أن تستغرق قراءة البايت الأول من الملف من التخزين البعيد أوامر من حيث الحجم أطول من التخزين المحلي.
- معدل نقل القراءة : بينما يوفر التخزين البعيد عادةً نطاقًا تردديًا مجمعًا كبيرًا ، قد تتمكن قراءة ملف واحد فقط من استخدام جزء صغير من هذا النطاق الترددي.
بالإضافة إلى ذلك ، بمجرد تحميل البايت الخام في الذاكرة ، قد يكون من الضروري أيضًا إلغاء تسلسل و / أو فك تشفير البيانات (مثل protobuf ) ، الأمر الذي يتطلب حسابًا إضافيًا. هذا الحمل موجود بغض النظر عما إذا كانت البيانات مخزنة محليًا أو عن بُعد ، ولكن يمكن أن تكون أسوأ في الحالة البعيدة إذا لم يتم جلب البيانات مسبقًا بشكل فعال.
للتخفيف من تأثير النفقات العامة المختلفة لاستخراج البيانات ، يمكن استخدام التحويل tf.data.Dataset.interleave
خطوة تحميل البيانات ، وتشذير محتويات مجموعات البيانات الأخرى (مثل قارئات ملفات البيانات). يمكن تحديد عدد مجموعات البيانات المراد تداخلها بواسطة الوسيطة cycle_length
، بينما يمكن تحديد مستوى التوازي بواسطة الوسيطة num_parallel_calls
. على غرار تحويل prefetch
، يدعم تحويل interleave
tf.data.AUTOTUNE
، والذي سيفوض القرار حول مستوى التوازي الذي يجب استخدامه لوقت تشغيل tf.data
.
تشذير متسلسل
تجعل الوسائط الافتراضية tf.data.Dataset.interleave
تشذير عينات مفردة من مجموعتي بيانات بالتتابع.
benchmark(
tf.data.Dataset.range(2)
.interleave(lambda _: ArtificialDataset())
)
Execution time: 0.4987426460002098
يسمح مخطط وقت تنفيذ البيانات هذا بعرض سلوك تحويل interleave
، وجلب العينات بدلاً من ذلك من مجموعتي البيانات المتاحتين. ومع ذلك ، لا يتم تضمين تحسين الأداء هنا.
تشذير متوازي
الآن ، استخدم وسيطة num_parallel_calls
interleave
يؤدي هذا إلى تحميل مجموعات بيانات متعددة بشكل متوازٍ ، مما يقلل من وقت انتظار فتح الملفات.
benchmark(
tf.data.Dataset.range(2)
.interleave(
lambda _: ArtificialDataset(),
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.283668874000341
هذه المرة ، كما يظهر مخطط وقت تنفيذ البيانات ، تتم قراءة مجموعتي البيانات بشكل متوازي ، مما يقلل من وقت معالجة البيانات العالمي.
موازاة تحويل البيانات
عند تحضير البيانات ، قد تحتاج عناصر الإدخال إلى المعالجة المسبقة. تحقيقا لهذه الغاية ، تقدم واجهة برمجة تطبيقات tf.data
تحويل tf.data.Dataset.map
، والذي يطبق وظيفة محددة من قبل المستخدم على كل عنصر من عناصر مجموعة بيانات الإدخال. نظرًا لأن عناصر الإدخال مستقلة عن بعضها البعض ، يمكن موازاة المعالجة المسبقة عبر أنوية وحدة المعالجة المركزية المتعددة. لجعل هذا ممكنًا ، على غرار عمليات prefetch
والتحويلات num_parallel_calls
، يوفر تحويل map
وسيطة interleave
لتحديد مستوى التوازي.
يعتمد اختيار أفضل قيمة لوسيطة num_parallel_calls
على أجهزتك ، وخصائص بيانات التدريب (مثل حجمها وشكلها) ، وتكلفة وظيفة الخريطة ، والمعالجة الأخرى التي تحدث على وحدة المعالجة المركزية في نفس الوقت. إرشاد بسيط هو استخدام عدد النوى المتاحة لوحدة المعالجة المركزية. ومع ذلك ، بالنسبة للتحويل prefetch
interleave
، فإن تحويل map
يدعم tf.data.AUTOTUNE
الذي سيفوض القرار حول مستوى التوازي الذي يجب استخدامه لوقت تشغيل tf.data
.
def mapped_function(s):
# Do some hard pre-processing
tf.py_function(lambda: time.sleep(0.03), [], ())
return s
رسم الخرائط المتسلسلة
ابدأ باستخدام تحويل map
بدون التوازي كمثال أساسي.
benchmark(
ArtificialDataset()
.map(mapped_function)
)
Execution time: 0.4505277170001136
بالنسبة للنهج الساذج ، هنا ، كما تظهر الحبكة ، يتم جمع الأوقات التي يتم قضاؤها في الفتح والقراءة والمعالجة المسبقة (رسم الخرائط) والتدريب معًا لتكرار واحد.
رسم الخرائط المتوازية
الآن ، استخدم نفس وظيفة المعالجة المسبقة ولكن قم بتطبيقها بالتوازي على عينات متعددة.
benchmark(
ArtificialDataset()
.map(
mapped_function,
num_parallel_calls=tf.data.AUTOTUNE
)
)
Execution time: 0.2839677860001757
كما يوضح مخطط البيانات ، تتداخل خطوات المعالجة المسبقة ، مما يقلل الوقت الإجمالي لتكرار واحد.
التخزين المؤقت
يمكن أن يخزن تحويل tf.data.Dataset.cache
مجموعة بيانات مؤقتًا ، إما في الذاكرة أو في التخزين المحلي. سيوفر هذا بعض العمليات (مثل فتح الملف وقراءة البيانات) من التنفيذ خلال كل حقبة.
benchmark(
ArtificialDataset()
.map( # Apply time consuming operations before cache
mapped_function
).cache(
),
5
)
Execution time: 0.3848854380003104
هنا ، يُظهر مخطط وقت تنفيذ البيانات أنه عند تخزين مجموعة بيانات مؤقتًا ، يتم تنفيذ التحويلات قبل cache
(مثل فتح الملف وقراءة البيانات) فقط خلال الحقبة الأولى. ستعيد العهود التالية استخدام البيانات المخزنة مؤقتًا بواسطة تحويل cache
.
إذا كانت الوظيفة المحددة من قبل المستخدم التي تم تمريرها في تحويل map
باهظة الثمن ، فقم بتطبيق تحويل cache
بعد تحويل map
طالما أن مجموعة البيانات الناتجة لا تزال تتناسب مع الذاكرة أو التخزين المحلي. إذا زادت الوظيفة التي يحددها المستخدم من المساحة المطلوبة لتخزين مجموعة البيانات بما يتجاوز سعة ذاكرة التخزين المؤقت ، فقم إما بتطبيقها بعد تحويل cache
أو ضع في اعتبارك المعالجة المسبقة لبياناتك قبل مهمة التدريب لتقليل استخدام الموارد.
رسم الخرائط الموجهة
إن استدعاء وظيفة معرفة من قبل المستخدم تم تمريرها إلى تحويل map
له عبء متعلق بجدولة وتنفيذ الوظيفة المحددة بواسطة المستخدم. قم بتوجيه الوظيفة المعرفة من قبل المستخدم (أي اجعلها تعمل عبر مجموعة من المدخلات في وقت واحد) وقم بتطبيق تحويل batch
قبل تحويل map
.
لتوضيح هذه الممارسة الجيدة ، فإن مجموعة البيانات الاصطناعية ليست مناسبة. يبلغ تأخير الجدولة حوالي 10 ميكروثانية (10e-6 ثوانٍ) ، أي أقل بكثير من عشرات المللي ثانية المستخدمة في مجموعة البيانات ArtificialDataset
، وبالتالي يصعب رؤية تأثيرها.
في هذا المثال ، استخدم الدالة الأساسية tf.data.Dataset.range
وقم بتبسيط حلقة التدريب إلى أبسط أشكالها.
fast_dataset = tf.data.Dataset.range(10000)
def fast_benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for _ in tf.data.Dataset.range(num_epochs):
for _ in dataset:
pass
tf.print("Execution time:", time.perf_counter() - start_time)
def increment(x):
return x+1
رسم الخرائط العددية
fast_benchmark(
fast_dataset
# Apply function one item at a time
.map(increment)
# Batch
.batch(256)
)
Execution time: 0.2712608739998359
يوضح الرسم أعلاه ما يجري (مع عدد أقل من العينات) باستخدام طريقة رسم الخرائط العددية. يوضح أنه يتم تطبيق الوظيفة المعينة لكل عينة. في حين أن هذه الوظيفة سريعة جدًا ، إلا أن لها بعض النفقات العامة التي تؤثر على أداء الوقت.
رسم الخرائط الموجهة
fast_benchmark(
fast_dataset
.batch(256)
# Apply function on a batch of items
# The tf.Tensor.__add__ method already handle batches
.map(increment)
)
Execution time: 0.02737950600021577
هذه المرة ، يتم استدعاء الوظيفة المعينة مرة واحدة ويتم تطبيقها على مجموعة من العينات. كما يظهر مخطط وقت تنفيذ البيانات ، بينما قد تستغرق الوظيفة مزيدًا من الوقت للتنفيذ ، تظهر النفقات العامة مرة واحدة فقط ، مما يؤدي إلى تحسين أداء الوقت الإجمالي.
تقليل البصمة على الذاكرة
يحافظ عدد من التحويلات ، بما في ذلك interleave
، prefetch
، والخلط shuffle
، على مخزن مؤقت داخلي للعناصر. إذا كانت الوظيفة المعرفة من قبل المستخدم التي تم تمريرها إلى تحويل map
تغير حجم العناصر ، فإن ترتيب تحويل الخريطة والتحولات التي تقوم بها عناصر المخزن المؤقت تؤثر على استخدام الذاكرة. بشكل عام ، اختر الترتيب الذي ينتج عنه مساحة أقل للذاكرة ، ما لم يكن الترتيب المختلف مطلوبًا للأداء.
التخزين المؤقت للحسابات الجزئية
يوصى بتخزين مجموعة البيانات مؤقتًا بعد تحويل map
إلا إذا كان هذا التحويل يجعل البيانات أكبر من أن تتناسب مع الذاكرة. يمكن تحقيق المفاضلة إذا كان من الممكن تقسيم الوظيفة المعينة إلى جزأين: جزء مستهلك للوقت وجزء يستهلك ذاكرة. في هذه الحالة ، يمكنك إجراء سلسلة من التحولات كما يلي:
dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
بهذه الطريقة ، يتم تنفيذ الجزء الذي يستغرق وقتًا طويلاً فقط خلال الحقبة الأولى ، وتتجنب استخدام مساحة كبيرة جدًا من ذاكرة التخزين المؤقت.
ملخص أفضل الممارسات
فيما يلي ملخص لأفضل الممارسات لتصميم أنابيب إدخال TensorFlow عالية الأداء:
- استخدم تحويل
prefetch
لتداخل عمل المنتج والمستهلك - قم بموازاة تحويل قراءة البيانات باستخدام تحويل
interleave
- قم بموازنة تحويل
map
عن طريق تعيين وسيطةnum_parallel_calls
- استخدم تحويل
cache
التخزين المؤقت لتخزين البيانات مؤقتًا في الذاكرة خلال المرحلة الأولى - تم تمرير الوظائف المحددة بواسطة المستخدم إلى Vectorize في تحويل
map
- قلل استخدام الذاكرة عند تطبيق تحويلات
interleave
،prefetch
، والتبديلshuffle
استنساخ الأشكال
للتعمق في فهم tf.data.Dataset
API ، يمكنك اللعب بخطوط الأنابيب الخاصة بك. يوجد أدناه الكود المستخدم لرسم الصور من هذا الدليل. يمكن أن يكون نقطة انطلاق جيدة ، مع عرض بعض الحلول للصعوبات الشائعة مثل:
- وقت التنفيذ التكاثر
- وظائف المعينة التنفيذ حريص
- تحويل
interleave
callable
import itertools
from collections import defaultdict
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
مجموعة البيانات
على غرار مجموعة البيانات ArtificialDataset
، يمكنك إنشاء مجموعة بيانات تعيد الوقت الذي تقضيه في كل خطوة.
class TimeMeasuredDataset(tf.data.Dataset):
# OUTPUT: (steps, timings, counters)
OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
_INSTANCES_COUNTER = itertools.count() # Number of datasets generated
_EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset
def _generator(instance_idx, num_samples):
epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
# Opening the file
open_enter = time.perf_counter()
time.sleep(0.03)
open_elapsed = time.perf_counter() - open_enter
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
read_enter = time.perf_counter()
time.sleep(0.015)
read_elapsed = time.perf_counter() - read_enter
yield (
[("Open",), ("Read",)],
[(open_enter, open_elapsed), (read_enter, read_elapsed)],
[(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
)
open_enter, open_elapsed = -1., -1. # Negative values will be filtered
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_types=cls.OUTPUT_TYPES,
output_shapes=cls.OUTPUT_SHAPES,
args=(next(cls._INSTANCES_COUNTER), num_samples)
)
توفر مجموعة البيانات هذه عينات من الشكل [[2, 1], [2, 2], [2, 3]]
ومن النوع [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]
. كل عينة هي:
(
[("Open"), ("Read")],
[(t0, d), (t0, d)],
[(i, e, -1), (i, e, s)]
)
أين:
-
Open
Read
هما معرفات للخطوات -
t0
هو الطابع الزمني عندما بدأت الخطوة المقابلة -
d
هو الوقت الذي يقضيه في الخطوة المقابلة -
i
هو فهرس المثيل -
e
هو مؤشر العصر (عدد المرات التي تم فيها تكرار مجموعة البيانات) -
s
هو مؤشر العينة
حلقة التكرار
اجعل حلقة التكرار أكثر تعقيدًا قليلاً لتجميع كل التوقيتات. لن يعمل هذا إلا مع مجموعات البيانات التي تنشئ عينات كما هو مفصل أعلاه.
def timelined_benchmark(dataset, num_epochs=2):
# Initialize accumulators
steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
epoch_enter = time.perf_counter()
for (steps, times, values) in dataset:
# Record dataset preparation informations
steps_acc = tf.concat((steps_acc, steps), axis=0)
times_acc = tf.concat((times_acc, times), axis=0)
values_acc = tf.concat((values_acc, values), axis=0)
# Simulate training time
train_enter = time.perf_counter()
time.sleep(0.01)
train_elapsed = time.perf_counter() - train_enter
# Record training informations
steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
epoch_elapsed = time.perf_counter() - epoch_enter
# Record epoch informations
steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
time.sleep(0.001)
tf.print("Execution time:", time.perf_counter() - start_time)
return {"steps": steps_acc, "times": times_acc, "values": values_acc}
طريقة التآمر
أخيرًا ، حدد وظيفة قادرة على رسم مخطط زمني وفقًا للقيم التي يتم إرجاعها بواسطة دالة timelined_benchmark
.
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
# Remove invalid entries (negative times, or empty steps) from the timelines
invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
steps = timeline['steps'][invalid_mask].numpy()
times = timeline['times'][invalid_mask].numpy()
values = timeline['values'][invalid_mask].numpy()
# Get a set of different steps, ordered by the first time they are encountered
step_ids, indices = np.stack(np.unique(steps, return_index=True))
step_ids = step_ids[np.argsort(indices)]
# Shift the starting time to 0 and compute the maximal time value
min_time = times[:,0].min()
times[:,0] = (times[:,0] - min_time)
end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
cmap = mpl.cm.get_cmap("plasma")
plt.close()
fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
fig.suptitle(title)
fig.set_size_inches(17.0, len(step_ids))
plt.xlim(-0.01, end)
for i, step in enumerate(step_ids):
step_name = step.decode()
ax = axs[i]
ax.set_ylabel(step_name)
ax.set_ylim(0, 1)
ax.set_yticks([])
ax.set_xlabel("time (s)")
ax.set_xticklabels([])
ax.grid(which="both", axis="x", color="k", linestyle=":")
# Get timings and annotation for the given step
entries_mask = np.squeeze(steps==step)
serie = np.unique(times[entries_mask], axis=0)
annotations = values[entries_mask]
ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
if annotate:
for j, (start, width) in enumerate(serie):
annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
horizontalalignment='left', verticalalignment='center')
if save:
plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")
استخدم الأغلفة للوظيفة المعينة
لتشغيل وظيفة تعيين في سياق حريص ، عليك لفها داخل استدعاء tf.py_function
.
def map_decorator(func):
def wrapper(steps, times, values):
# Use a tf.py_function to prevent auto-graph from compiling the method
return tf.py_function(
func,
inp=(steps, times, values),
Tout=(steps.dtype, times.dtype, values.dtype)
)
return wrapper
مقارنة خطوط الأنابيب
_batch_map_num_items = 50
def dataset_generator_fun(*args):
return TimeMeasuredDataset(num_samples=_batch_map_num_items)
ساذج
@map_decorator
def naive_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001) # Time consuming step
time.sleep(0.0001) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, [["Map"]]), axis=0),
tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
tf.concat((values, [values[-1]]), axis=0)
)
naive_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.flat_map(dataset_generator_fun)
.map(naive_map)
.batch(_batch_map_num_items, drop_remainder=True)
.unbatch(),
5
)
WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead Execution time: 13.13538893499981
المحسن
@map_decorator
def time_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001 * values.shape[0]) # Time consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
@map_decorator
def memory_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.0001 * values.shape[0]) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
# Use tf.tile to handle batch dimension
return (
tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
optimized_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.interleave( # Parallelize data reading
dataset_generator_fun,
num_parallel_calls=tf.data.AUTOTUNE
)
.batch( # Vectorize your mapped function
_batch_map_num_items,
drop_remainder=True)
.map( # Parallelize map transformation
time_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.cache() # Cache data
.map( # Reduce memory usage
memory_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.prefetch( # Overlap producer and consumer works
tf.data.AUTOTUNE
)
.unbatch(),
5
)
Execution time: 6.723691489999965
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)