TensorFlow.org'da görüntüleyin | Google Colab'da çalıştırın | Kaynağı GitHub'da görüntüleyin | Not defterini indir |
genel bakış
GPU'lar ve TPU'lar, tek bir eğitim adımını yürütmek için gereken süreyi önemli ölçüde azaltabilir. En yüksek performansı elde etmek, mevcut adım bitmeden bir sonraki adım için veri sağlayan verimli bir girdi hattı gerektirir. tf.data
API, esnek ve verimli girdi ardışık düzenleri oluşturmaya yardımcı olur. Bu belge, yüksek performanslı TensorFlow giriş işlem hatları oluşturmak için tf.data
API'sinin nasıl kullanılacağını gösterir.
Devam etmeden önce, tf.data
API'sinin nasıl kullanılacağını öğrenmek için TensorFlow giriş işlem hatları oluşturma kılavuzuna bakın.
Kaynaklar
- TensorFlow giriş işlem hatları oluşturun
-
tf.data.Dataset
API'si - TF Profiler ile
tf.data
performansını analiz edin
Kurmak
import tensorflow as tf
import time
Bu kılavuz boyunca, bir veri kümesini yineleyecek ve performansı ölçeceksiniz. Tekrarlanabilir performans kıyaslamaları yapmak zor olabilir. Tekrarlanabilirliği etkileyen farklı faktörler şunları içerir:
- Geçerli CPU yükü
- ağ trafiği
- Önbellek gibi karmaşık mekanizmalar
Tekrarlanabilir bir kıyaslama elde etmek için yapay bir örnek oluşturacaksınız.
veri kümesi
ArtificialDataset
tf.data.Dataset
adlı bir sınıf tanımlayarak başlayın. Bu veri kümesi:
-
num_samples
örnekleri oluşturur (varsayılan 3'tür) - Bir dosyayı açmayı simüle etmek için ilk öğeden önce bir süre uyur
- Bir dosyadan veri okumayı simüle etmek için her bir öğeyi üretmeden önce bir süre uyur
class ArtificialDataset(tf.data.Dataset):
def _generator(num_samples):
# Opening the file
time.sleep(0.03)
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
time.sleep(0.015)
yield (sample_idx,)
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_signature = tf.TensorSpec(shape = (1,), dtype = tf.int64),
args=(num_samples,)
)
Bu veri kümesi, her örneğin başına ve arasına sabit bir gecikme ekleyerek tf.data.Dataset.range
benzer.
eğitim döngüsü
Ardından, bir veri kümesi üzerinde yinelemenin ne kadar sürdüğünü ölçen yapay bir eğitim döngüsü yazın. Eğitim süresi simüle edilir.
def benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
for sample in dataset:
# Performing a training step
time.sleep(0.01)
print("Execution time:", time.perf_counter() - start_time)
Performansı optimize edin
Performansın nasıl optimize edilebileceğini göstermek için ArtificialDataset
performansını iyileştireceksiniz.
naif yaklaşım
Veri kümesini olduğu gibi yineleyerek, hile kullanmadan saf bir işlem hattıyla başlayın.
benchmark(ArtificialDataset())
tutucu4 l10n-yerExecution time: 0.26497629899995445
Kaputun altında, yürütme süreniz şu şekilde harcandı:
Çizim, bir eğitim adımı gerçekleştirmenin şunları içerdiğini gösterir:
- Henüz açılmamış bir dosyanın açılması
- Dosyadan bir veri girişi getirme
- Verilerin eğitim için kullanılması
Ancak, buradaki gibi saf bir eşzamanlı uygulamada, boru hattınız verileri alırken modeliniz boşta oturuyor. Tersine, modeliniz eğitim alırken, giriş boru hattı boşta oturuyor. Eğitim adım süresi böylece açılış, okuma ve eğitim sürelerinin toplamıdır.
Sonraki bölümler, bu girdi ardışık düzenini temel alarak, performanslı TensorFlow girdi ardışık düzenlerini tasarlamak için en iyi uygulamaları gösterir.
ön yükleme
Önceden getirme, bir eğitim adımının ön işlemesi ve model yürütmesiyle örtüşür. Model s
eğitim adımını yürütürken, girdi ardışık düzeni s+1
adımı için verileri okuyor. Bunu yapmak, adım süresini (toplamın aksine) eğitimin maksimum değerine ve verilerin çıkarılması için gereken süreye azaltır.
tf.data
API, tf.data.Dataset.prefetch
dönüşümünü sağlar. Verinin üretildiği zamanı, verinin tüketildiği zamandan ayırmak için kullanılabilir. Özellikle, dönüşüm, girdi veri kümesindeki öğeleri talep edilmeden önce önceden getirmek için bir arka plan iş parçacığı ve bir dahili arabellek kullanır. Önceden getirilecek öğelerin sayısı, tek bir eğitim adımı tarafından tüketilen partilerin sayısına eşit (veya muhtemelen daha fazla) olmalıdır. Bu değeri manuel olarak ayarlayabilir veya tf.data.AUTOTUNE
olarak ayarlayabilirsiniz; bu, tf.data
çalışma zamanının değeri çalışma zamanında dinamik olarak ayarlamasını isteyecektir.
Önceden getirme dönüşümünün, bir "üreticinin" işiyle bir "tüketicinin" işinin örtüşmesi için bir fırsat olduğu her zaman fayda sağladığını unutmayın.
benchmark(
ArtificialDataset()
.prefetch(tf.data.AUTOTUNE)
)
tutucu6 l10n-yerExecution time: 0.21731788600027357
Şimdi, veri yürütme zaman grafiğinin gösterdiği gibi, eğitim adımı örnek 0 için çalışırken, giriş ardışık düzeni örnek 1 için verileri okuyor ve bu böyle devam ediyor.
Paralelleştirme veri çıkarma
Gerçek dünya ortamında, giriş verileri uzaktan depolanabilir (örneğin, Google Cloud Storage veya HDFS'de). Verileri yerel olarak okurken iyi çalışan bir veri kümesi ardışık düzeni, yerel ve uzak depolama arasındaki aşağıdaki farklar nedeniyle verileri uzaktan okurken G/Ç'de darboğaz oluşturabilir:
- İlk bayt süresi : Bir dosyanın ilk baytını uzak depolamadan okumak, yerel depolamadan çok daha uzun siparişler alabilir.
- Okuma verimi : Uzak depolama tipik olarak büyük bir toplam bant genişliği sunarken, tek bir dosyayı okumak bu bant genişliğinin yalnızca küçük bir kısmını kullanabilir.
Ek olarak, ham baytlar belleğe yüklendikten sonra, ek hesaplama gerektiren verilerin seri hale getirilmesi ve/veya şifresinin çözülmesi (örneğin protobuf ) gerekli olabilir. Bu ek yük, verilerin yerel olarak mı yoksa uzaktan mı depolandığına bakılmaksızın mevcuttur, ancak veriler etkin bir şekilde önceden getirilmezse uzak durumda daha kötü olabilir.
Çeşitli veri çıkarma genel giderlerinin etkisini azaltmak için, tf.data.Dataset.interleave
dönüşümü, diğer veri kümelerinin (veri dosyası okuyucuları gibi) içeriklerini araya ekleyerek veri yükleme adımını paralel hale getirmek için kullanılabilir. Çakışacak veri kümelerinin sayısı cycle_length
bağımsız değişkeni ile belirtilebilirken, paralellik düzeyi num_parallel_calls
bağımsız değişkeni tarafından belirtilebilir. Önceden prefetch
dönüşümüne benzer şekilde, interleave
ekleme dönüşümü tf.data.AUTOTUNE
destekler ve bu, tf.data
çalışma zamanına hangi düzeyde paralellik kullanılacağına ilişkin kararı devreder.
sıralı serpiştirme
tf.data.Dataset.interleave
dönüşümünün varsayılan argümanları, iki veri kümesinden tek örnekleri sırayla serpiştirmesini sağlar.
benchmark(
tf.data.Dataset.range(2)
.interleave(lambda _: ArtificialDataset())
)
tutucu8 l10n-yerExecution time: 0.4987426460002098
Bu veri yürütme zaman grafiği, mevcut iki veri kümesinden alternatif olarak örnekler getirerek, interleave
ekleme dönüşümünün davranışını sergilemeye izin verir. Ancak burada herhangi bir performans iyileştirmesi söz konusu değildir.
paralel serpiştirme
Şimdi, interleave
ekleme dönüşümünün num_parallel_calls
bağımsız değişkenini kullanın. Bu, birden çok veri kümesini paralel olarak yükleyerek dosyaların açılmasını bekleyen süreyi azaltır.
benchmark(
tf.data.Dataset.range(2)
.interleave(
lambda _: ArtificialDataset(),
num_parallel_calls=tf.data.AUTOTUNE
)
)
tutucu10 l10n-yerExecution time: 0.283668874000341
Bu kez, veri yürütme zaman grafiğinin gösterdiği gibi, iki veri kümesinin okunması paralel hale getirilerek küresel veri işleme süresi azaltılır.
Paralelleştirme veri dönüşümü
Veri hazırlanırken girdi öğelerinin önceden işlenmesi gerekebilir. Bu amaçla, tf.data
API, giriş veri kümesinin her bir öğesine kullanıcı tanımlı bir işlev uygulayan tf.data.Dataset.map
dönüşümünü sunar. Giriş öğeleri birbirinden bağımsız olduğundan, ön işleme birden çok CPU çekirdeği arasında paralel hale getirilebilir. Bunu mümkün kılmak için, prefetch
ve interleave
ekleme dönüşümlerine benzer şekilde, map
dönüşümü paralellik düzeyini belirtmek için num_parallel_calls
argümanını sağlar.
num_parallel_calls
argümanı için en iyi değeri seçmek, donanımınıza, eğitim verilerinizin özelliklerine (boyutu ve şekli gibi), harita işlevinizin maliyetine ve aynı anda CPU'da başka hangi işlemlerin gerçekleştiğine bağlıdır. Basit bir buluşsal yöntem, mevcut CPU çekirdeği sayısını kullanmaktır. Ancak, prefetch
ve interleave
ekleme dönüşümüne gelince, map
dönüşümü tf.data.AUTOTUNE
destekler ve bu da tf.data
çalışma zamanına hangi düzeyde paralellik kullanılacağına ilişkin kararı devreder.
def mapped_function(s):
# Do some hard pre-processing
tf.py_function(lambda: time.sleep(0.03), [], ())
return s
sıralı eşleme
Temel örnek olarak paralellik olmadan map
dönüşümünü kullanarak başlayın.
benchmark(
ArtificialDataset()
.map(mapped_function)
)
tutucu13 l10n-yerExecution time: 0.4505277170001136
Saf yaklaşıma gelince, burada, grafiğin gösterdiği gibi, açma, okuma, ön işleme (haritalama) ve eğitim adımları için harcanan süreler tek bir yineleme için toplanır.
paralel haritalama
Şimdi, aynı ön işleme işlevini kullanın, ancak bunu birden çok örneğe paralel olarak uygulayın.
benchmark(
ArtificialDataset()
.map(
mapped_function,
num_parallel_calls=tf.data.AUTOTUNE
)
)
tutucu15 l10n-yerExecution time: 0.2839677860001757
Veri grafiğinin gösterdiği gibi, ön işleme adımları örtüşerek tek bir yineleme için toplam süreyi azaltır.
Önbelleğe almak
tf.data.Dataset.cache
dönüşümü, bellekte veya yerel depolamada bir veri kümesini önbelleğe alabilir. Bu, bazı işlemlerin (dosya açma ve veri okuma gibi) her çağda yürütülmesini önleyecektir.
benchmark(
ArtificialDataset()
.map( # Apply time consuming operations before cache
mapped_function
).cache(
),
5
)
tutucu17 l10n-yerExecution time: 0.3848854380003104
Burada, veri yürütme zaman grafiği, bir veri kümesini önbelleğe aldığınızda, cache
önceki dönüşümlerin (dosya açma ve veri okuma gibi) yalnızca ilk dönem sırasında yürütüldüğünü gösterir. Sonraki dönemler, cache
dönüşümü tarafından önbelleğe alınan verileri yeniden kullanacak.
map
dönüşümüne geçirilen kullanıcı tanımlı işlev pahalıysa, elde edilen veri kümesi belleğe veya yerel depolamaya sığabildiği sürece map
dönüşümünden sonra cache
dönüşümünü uygulayın. Kullanıcı tanımlı işlev, veri kümesini önbellek kapasitesinin ötesinde depolamak için gereken alanı artırırsa, bunu cache
dönüştürmesinden sonra uygulayın veya kaynak kullanımını azaltmak için eğitim işinizden önce verilerinizi önceden işlemeyi düşünün.
haritalamayı vektörleştirme
map
dönüşümüne geçirilen kullanıcı tanımlı bir işlevi çağırmak, kullanıcı tanımlı işlevi zamanlama ve yürütmeyle ilgili ek yüke sahiptir. Kullanıcı tanımlı işlevi vektörleştirin (yani, aynı anda bir grup girdi üzerinde çalışmasını sağlayın) ve batch
dönüştürmeyi map
dönüştürmesinden önce uygulayın.
Bu iyi uygulamayı göstermek için yapay veri kümeniz uygun değildir. Programlama gecikmesi yaklaşık 10 mikrosaniyedir (10e-6 saniye), bu, ArtificialDataset
Veri Kümesi'nde kullanılan onlarca milisaniyeden çok daha azdır ve bu nedenle etkisini görmek zordur.
Bu örnek için, temel tf.data.Dataset.range
işlevini kullanın ve eğitim döngüsünü en basit biçimine basitleştirin.
fast_dataset = tf.data.Dataset.range(10000)
def fast_benchmark(dataset, num_epochs=2):
start_time = time.perf_counter()
for _ in tf.data.Dataset.range(num_epochs):
for _ in dataset:
pass
tf.print("Execution time:", time.perf_counter() - start_time)
def increment(x):
return x+1
skaler haritalama
fast_benchmark(
fast_dataset
# Apply function one item at a time
.map(increment)
# Batch
.batch(256)
)
tutucu20 l10n-yerExecution time: 0.2712608739998359
Yukarıdaki çizim, skaler haritalama yöntemini kullanarak (daha az örnekle) neler olduğunu göstermektedir. Her bir örnek için eşlenen işlevin uygulandığını gösterir. Bu işlev çok hızlı olsa da, zaman performansını etkileyen bazı ek yükleri vardır.
vektörleştirilmiş eşleme
fast_benchmark(
fast_dataset
.batch(256)
# Apply function on a batch of items
# The tf.Tensor.__add__ method already handle batches
.map(increment)
)
tutucu22 l10n-yerExecution time: 0.02737950600021577
Bu sefer, eşlenen işlev bir kez çağrılır ve bir numune partisine uygulanır. Veri yürütme zaman grafiğinin gösterdiği gibi, işlevin yürütülmesi daha fazla zaman alabilirken, ek yük yalnızca bir kez görünerek genel zaman performansını iyileştirir.
Bellek ayak izini azaltmak
interleave
, prefetch
ve shuffle
dahil olmak üzere bir dizi dönüşüm, öğelerin bir dahili arabelleğini korur. Harita dönüşümüne geçirilen kullanıcı tanımlı işlev, öğelerin boyutunu değiştirirse, map
dönüşümünün sırası ve arabelleğe alınan öğelerin dönüşümleri bellek kullanımını etkiler. Genel olarak, performans için farklı bir sıralama istenmediği sürece, daha düşük bellek ayak izi ile sonuçlanan sırayı seçin.
Kısmi hesaplamaları önbelleğe alma
Bu dönüşümün verileri belleğe sığmayacak kadar büyük hale getirmesi dışında, map
dönüşümünden sonra veri kümesinin önbelleğe alınması önerilir. Eşlenen işleviniz iki parçaya bölünebilirse, bir ödünleşim elde edilebilir: zaman alan parça ve bellek tüketen parça. Bu durumda dönüşümlerinizi aşağıdaki gibi zincirleyebilirsiniz:
dataset.map(time_consuming_mapping).cache().map(memory_consuming_mapping)
Bu şekilde, zaman alıcı kısım yalnızca ilk epoch sırasında yürütülür ve çok fazla önbellek alanı kullanmaktan kaçınırsınız.
En iyi uygulama özeti
Performanslı TensorFlow giriş ardışık düzenlerini tasarlamaya yönelik en iyi uygulamaların bir özetini burada bulabilirsiniz:
- Bir üretici ve tüketicinin çalışmalarını örtüşmek için
prefetch
dönüşümünü kullanın -
interleave
dönüşümünü kullanarak veri okuma dönüşümünü paralel hale getirin -
num_parallel_calls
bağımsız değişkenini ayarlayarakmap
dönüşümünü paralel hale getirin - İlk dönem sırasında verileri bellekte önbelleğe almak için
cache
dönüşümünü kullanın -
map
dönüşümüne aktarılan kullanıcı tanımlı işlevleri vektörleştirin -
interleave
,prefetch
veshuffle
dönüşümlerini uygularken bellek kullanımını azaltın
Rakamların çoğaltılması
tf.data.Dataset
API anlayışında daha derine inmek için kendi ardışık düzenlerinizle oynayabilirsiniz. Aşağıda, bu kılavuzdaki görüntüleri çizmek için kullanılan kod bulunmaktadır. Aşağıdakiler gibi yaygın zorluklar için bazı geçici çözümler göstererek iyi bir başlangıç noktası olabilir:
- Yürütme süresi tekrarlanabilirliği
- Eşlenen işlevler istekli yürütme
-
interleave
dönüşüm çağrılabilir
import itertools
from collections import defaultdict
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
veri kümesi
ArtificialDataset
Veri Kümesine benzer şekilde, her adımda harcanan süreyi döndüren bir veri kümesi oluşturabilirsiniz.
class TimeMeasuredDataset(tf.data.Dataset):
# OUTPUT: (steps, timings, counters)
OUTPUT_TYPES = (tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32)
OUTPUT_SHAPES = ((2, 1), (2, 2), (2, 3))
_INSTANCES_COUNTER = itertools.count() # Number of datasets generated
_EPOCHS_COUNTER = defaultdict(itertools.count) # Number of epochs done for each dataset
def _generator(instance_idx, num_samples):
epoch_idx = next(TimeMeasuredDataset._EPOCHS_COUNTER[instance_idx])
# Opening the file
open_enter = time.perf_counter()
time.sleep(0.03)
open_elapsed = time.perf_counter() - open_enter
for sample_idx in range(num_samples):
# Reading data (line, record) from the file
read_enter = time.perf_counter()
time.sleep(0.015)
read_elapsed = time.perf_counter() - read_enter
yield (
[("Open",), ("Read",)],
[(open_enter, open_elapsed), (read_enter, read_elapsed)],
[(instance_idx, epoch_idx, -1), (instance_idx, epoch_idx, sample_idx)]
)
open_enter, open_elapsed = -1., -1. # Negative values will be filtered
def __new__(cls, num_samples=3):
return tf.data.Dataset.from_generator(
cls._generator,
output_types=cls.OUTPUT_TYPES,
output_shapes=cls.OUTPUT_SHAPES,
args=(next(cls._INSTANCES_COUNTER), num_samples)
)
Bu veri kümesi, şekil [[2, 1], [2, 2], [2, 3]]
ve [tf.dtypes.string, tf.dtypes.float32, tf.dtypes.int32]
. Her örnek:
(
[("Open"), ("Read")],
[(t0, d), (t0, d)],
[(i, e, -1), (i, e, s)]
)
Neresi:
-
Open
veRead
adım tanımlayıcılarıdır -
t0
, ilgili adımın başladığı zaman damgasıdır -
d
, ilgili adımda harcanan zamandır -
i
örnek diziniyim -
e
, dönem indeksidir (veri kümesinin tekrarlanma sayısı) -
s
örnek dizindir
yineleme döngüsü
Tüm zamanlamaları toplamak için yineleme döngüsünü biraz daha karmaşık hale getirin. Bu, yalnızca yukarıda ayrıntılı olarak açıklandığı gibi örnekler üreten veri kümeleriyle çalışacaktır.
def timelined_benchmark(dataset, num_epochs=2):
# Initialize accumulators
steps_acc = tf.zeros([0, 1], dtype=tf.dtypes.string)
times_acc = tf.zeros([0, 2], dtype=tf.dtypes.float32)
values_acc = tf.zeros([0, 3], dtype=tf.dtypes.int32)
start_time = time.perf_counter()
for epoch_num in range(num_epochs):
epoch_enter = time.perf_counter()
for (steps, times, values) in dataset:
# Record dataset preparation informations
steps_acc = tf.concat((steps_acc, steps), axis=0)
times_acc = tf.concat((times_acc, times), axis=0)
values_acc = tf.concat((values_acc, values), axis=0)
# Simulate training time
train_enter = time.perf_counter()
time.sleep(0.01)
train_elapsed = time.perf_counter() - train_enter
# Record training informations
steps_acc = tf.concat((steps_acc, [["Train"]]), axis=0)
times_acc = tf.concat((times_acc, [(train_enter, train_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [values[-1]]), axis=0)
epoch_elapsed = time.perf_counter() - epoch_enter
# Record epoch informations
steps_acc = tf.concat((steps_acc, [["Epoch"]]), axis=0)
times_acc = tf.concat((times_acc, [(epoch_enter, epoch_elapsed)]), axis=0)
values_acc = tf.concat((values_acc, [[-1, epoch_num, -1]]), axis=0)
time.sleep(0.001)
tf.print("Execution time:", time.perf_counter() - start_time)
return {"steps": steps_acc, "times": times_acc, "values": values_acc}
çizim yöntemi
Son olarak, timelined_benchmark
işlevi tarafından döndürülen değerlerle bir zaman çizelgesi çizebilen bir işlev tanımlayın.
def draw_timeline(timeline, title, width=0.5, annotate=False, save=False):
# Remove invalid entries (negative times, or empty steps) from the timelines
invalid_mask = np.logical_and(timeline['times'] > 0, timeline['steps'] != b'')[:,0]
steps = timeline['steps'][invalid_mask].numpy()
times = timeline['times'][invalid_mask].numpy()
values = timeline['values'][invalid_mask].numpy()
# Get a set of different steps, ordered by the first time they are encountered
step_ids, indices = np.stack(np.unique(steps, return_index=True))
step_ids = step_ids[np.argsort(indices)]
# Shift the starting time to 0 and compute the maximal time value
min_time = times[:,0].min()
times[:,0] = (times[:,0] - min_time)
end = max(width, (times[:,0]+times[:,1]).max() + 0.01)
cmap = mpl.cm.get_cmap("plasma")
plt.close()
fig, axs = plt.subplots(len(step_ids), sharex=True, gridspec_kw={'hspace': 0})
fig.suptitle(title)
fig.set_size_inches(17.0, len(step_ids))
plt.xlim(-0.01, end)
for i, step in enumerate(step_ids):
step_name = step.decode()
ax = axs[i]
ax.set_ylabel(step_name)
ax.set_ylim(0, 1)
ax.set_yticks([])
ax.set_xlabel("time (s)")
ax.set_xticklabels([])
ax.grid(which="both", axis="x", color="k", linestyle=":")
# Get timings and annotation for the given step
entries_mask = np.squeeze(steps==step)
serie = np.unique(times[entries_mask], axis=0)
annotations = values[entries_mask]
ax.broken_barh(serie, (0, 1), color=cmap(i / len(step_ids)), linewidth=1, alpha=0.66)
if annotate:
for j, (start, width) in enumerate(serie):
annotation = "\n".join([f"{l}: {v}" for l,v in zip(("i", "e", "s"), annotations[j])])
ax.text(start + 0.001 + (0.001 * (j % 2)), 0.55 - (0.1 * (j % 2)), annotation,
horizontalalignment='left', verticalalignment='center')
if save:
plt.savefig(title.lower().translate(str.maketrans(" ", "_")) + ".svg")
Eşlenen işlev için sarmalayıcıları kullanın
Eşlenmiş işlevi istekli bir bağlamda çalıştırmak için, bunları bir tf.py_function
çağrısı içine sarmanız gerekir.
def map_decorator(func):
def wrapper(steps, times, values):
# Use a tf.py_function to prevent auto-graph from compiling the method
return tf.py_function(
func,
inp=(steps, times, values),
Tout=(steps.dtype, times.dtype, values.dtype)
)
return wrapper
Boru hatları karşılaştırması
_batch_map_num_items = 50
def dataset_generator_fun(*args):
return TimeMeasuredDataset(num_samples=_batch_map_num_items)
Toy
@map_decorator
def naive_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001) # Time consuming step
time.sleep(0.0001) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, [["Map"]]), axis=0),
tf.concat((times, [[map_enter, map_elapsed]]), axis=0),
tf.concat((values, [values[-1]]), axis=0)
)
naive_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.flat_map(dataset_generator_fun)
.map(naive_map)
.batch(_batch_map_num_items, drop_remainder=True)
.unbatch(),
5
)
tutucu32 l10n-yerWARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_types is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead WARNING:tensorflow:From /tmp/ipykernel_23983/64197174.py:36: calling DatasetV2.from_generator (from tensorflow.python.data.ops.dataset_ops) with output_shapes is deprecated and will be removed in a future version. Instructions for updating: Use output_signature instead Execution time: 13.13538893499981
Optimize
@map_decorator
def time_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.001 * values.shape[0]) # Time consuming step
map_elapsed = time.perf_counter() - map_enter
return (
tf.concat((steps, tf.tile([[["1st map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
@map_decorator
def memory_consuming_map(steps, times, values):
map_enter = time.perf_counter()
time.sleep(0.0001 * values.shape[0]) # Memory consuming step
map_elapsed = time.perf_counter() - map_enter
# Use tf.tile to handle batch dimension
return (
tf.concat((steps, tf.tile([[["2nd map"]]], [steps.shape[0], 1, 1])), axis=1),
tf.concat((times, tf.tile([[[map_enter, map_elapsed]]], [times.shape[0], 1, 1])), axis=1),
tf.concat((values, tf.tile([[values[:][-1][0]]], [values.shape[0], 1, 1])), axis=1)
)
optimized_timeline = timelined_benchmark(
tf.data.Dataset.range(2)
.interleave( # Parallelize data reading
dataset_generator_fun,
num_parallel_calls=tf.data.AUTOTUNE
)
.batch( # Vectorize your mapped function
_batch_map_num_items,
drop_remainder=True)
.map( # Parallelize map transformation
time_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.cache() # Cache data
.map( # Reduce memory usage
memory_consuming_map,
num_parallel_calls=tf.data.AUTOTUNE
)
.prefetch( # Overlap producer and consumer works
tf.data.AUTOTUNE
)
.unbatch(),
5
)
Execution time: 6.723691489999965-yer tutucu35 l10n-yer
draw_timeline(naive_timeline, "Naive", 15)
draw_timeline(optimized_timeline, "Optimized", 15)