TensorFlow.org'da görüntüleyin | Google Colab'da çalıştırın | Kaynağı GitHub'da görüntüleyin | Not defterini indir |
tf.distribute API'leri, kullanıcıların eğitimlerini tek bir makineden birden çok makineye ölçeklendirmeleri için kolay bir yol sağlar. Modellerini ölçeklendirirken, kullanıcıların girdilerini birden fazla cihaza dağıtmaları da gerekir. tf.distribute
, girdilerinizi cihazlar arasında otomatik olarak dağıtabileceğiniz API'ler sağlar.
Bu kılavuz size, tf.distribute
API'lerini kullanarak dağıtılmış veri kümesi ve yineleyiciler oluşturmanın farklı yollarını gösterecektir. Ek olarak, aşağıdaki konular ele alınacaktır:
-
tf.distribute.Strategy.experimental_distribute_dataset
vetf.distribute.Strategy.distribute_datasets_from_function
kullanılırken kullanım, parçalama ve toplu işleme seçenekleri. - Dağıtılmış veri kümesi üzerinde yineleme yapabileceğiniz farklı yollar.
-
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API'leri iletf.data
API'leri arasındaki farklar ve kullanıcıların kullanımlarında karşılaşabilecekleri sınırlamalar.
Bu kılavuz, Keras API'leri ile dağıtılmış girdi kullanımını kapsamaz.
Dağıtılmış Veri Kümeleri
Ölçeklendirmek üzere tf.distribute
API'lerini kullanmak için, kullanıcıların girdilerini temsil etmek üzere tf.data.Dataset
kullanmaları önerilir. tf.distribute
, tf.data.Dataset
ile verimli bir şekilde çalışacak şekilde yapılmıştır (örneğin, verilerin her hızlandırıcı cihaza otomatik olarak önceden getirilmesi) ve performans optimizasyonları uygulamaya düzenli olarak dahil edilmiştir. tf.data.Dataset
dışında bir şey kullanmak için bir kullanım durumunuz varsa, lütfen bu kılavuzun sonraki bölümüne bakın. Dağıtılmış olmayan bir eğitim döngüsünde, kullanıcılar önce bir tf.data.Dataset
örneği oluşturur ve ardından öğeler üzerinde yinelenir. Örneğin:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1yer tutucu2 l10n-yer
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Kullanıcıların, bir kullanıcının mevcut kodunda minimum değişiklikle tf.distribute
stratejisini kullanmasına izin vermek için, bir tf.data.Dataset
örneğini dağıtacak ve dağıtılmış bir veri kümesi nesnesi döndürecek iki API tanıtıldı. Bir kullanıcı daha sonra bu dağıtılmış veri kümesi örneğini yineleyebilir ve modelini daha önce olduğu gibi eğitebilir. Şimdi iki API'ye bakalım - tf.distribute.Strategy.experimental_distribute_dataset
ve tf.distribute.Strategy.distribute_datasets_from_function
daha ayrıntılı olarak:
tf.distribute.Strategy.experimental_distribute_dataset
kullanım
Bu API, girdi olarak bir tf.data.Dataset
örneği alır ve bir tf.distribute.DistributedDataset
örneği döndürür. Girdi veri kümesini, genel toplu iş boyutuna eşit bir değerle toplulaştırmanız gerekir. Bu global parti boyutu, tüm cihazlarda 1 adımda işlemek istediğiniz numune sayısıdır. Bu dağıtılmış veri kümesi üzerinde Pythonic bir şekilde yinelenebilir veya iter
kullanarak bir yineleyici oluşturabilirsiniz. Döndürülen nesne bir tf.data.Dataset
örneği değildir ve veri kümesini herhangi bir şekilde dönüştüren veya inceleyen diğer API'leri desteklemez. Girişinizi farklı replikalar üzerinden parçalamak istediğiniz belirli yöntemleriniz yoksa önerilen API budur.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
tutucu5 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Özellikler
yığınlama
tf.distribute
, giriş tf.data.Dataset
örneğini, küresel toplu iş boyutunun eşitlenen çoğaltma sayısına bölünmesine eşit olan yeni bir toplu iş boyutuyla yeniden toplulaştırır. Eşitlemedeki replikaların sayısı, eğitim sırasında gradyan allreduce'da yer alan cihazların sayısına eşittir. Bir kullanıcı dağıtılmış yineleyiciyi çağırdığında, next
yinelemede yineleme başına toplu veri boyutu döndürülür. Yeniden toplu veri kümesi kardinalitesi her zaman kopya sayısının bir katı olacaktır. Burada bir çift örnek var:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Dağıtım olmadan:
- Grup 1: [0, 1, 2, 3]
- Grup 2: [4, 5]
2 kopya üzerinden dağıtım ile. Son toplu iş ([4, 5]) 2 kopya arasında bölünür.
Parti 1:
- Kopya 1:[0, 1]
- Kopya 2:[2, 3]
2. parti:
- Kopya 2: [4]
- Kopya 2: [5]
tf.data.Dataset.range(4).batch(4)
- Dağıtım olmadan:
- Grup 1: [[0], [1], [2], [3]]
- 5 kopyadan fazla dağıtım ile:
- Parti 1:
- Kopya 1: [0]
- Kopya 2: [1]
- Kopya 3: [2]
- 4 kopya: [3]
- Kopya 5: []
tf.data.Dataset.range(8).batch(4)
- Dağıtım olmadan:
- Grup 1: [0, 1, 2, 3]
- 2. Grup: [4, 5, 6, 7]
- 3 kopya üzerinden dağıtım ile:
- Parti 1:
- Kopya 1: [0, 1]
- Kopya 2: [2, 3]
- Kopya 3: []
- 2. parti:
- Kopya 1: [4, 5]
- Kopya 2: [6, 7]
- Kopya 3: []
Veri kümesini yeniden gruplandırma, çoğaltma sayısıyla doğrusal olarak artan bir alan karmaşıklığına sahiptir. Bu, çok çalışanlı eğitim kullanım durumu için giriş hattının OOM hatalarıyla karşılaşabileceği anlamına gelir.
parçalama
tf.distribute
ayrıca MultiWorkerMirroredStrategy
ve TPUStrategy
ile çoklu çalışan eğitiminde girdi veri kümesini otomatik olarak parçalar. Her veri kümesi, çalışanın CPU cihazında oluşturulur. Bir veri kümesini bir çalışan kümesi üzerinde otomatik olarak paylaşmak, her çalışana tüm veri kümesinin bir alt kümesinin atandığı anlamına gelir (doğru tf.data.experimental.AutoShardPolicy
ayarlanmışsa). Bu, her adımda, her çalışan tarafından örtüşmeyen veri kümesi öğelerinin global bir toplu iş boyutunun işlenmesini sağlamak içindir. Otomatik parçalama, tf.data.experimental.DistributeOptions
kullanılarak belirtilebilecek birkaç farklı seçeneğe sahiptir. ParameterServerStrategy
ile çok çalışanlı eğitimde otomatik parçalama olmadığını ve bu stratejiyle veri kümesi oluşturma hakkında daha fazla bilgiyi Parametre Sunucusu Stratejisi eğitiminde bulunabileceğini unutmayın.
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
tf.data.experimental.AutoShardPolicy
için ayarlayabileceğiniz üç farklı seçenek vardır:
- OTOMATİK: Bu, DOSYA tarafından parçalanmaya çalışılacağı anlamına gelen varsayılan seçenektir. Dosya tabanlı bir veri kümesi algılanmazsa, FILE tarafından parçalama girişimi başarısız olur.
tf.distribute
daha sonra DATA tarafından parçalanmaya geri dönecektir. Girdi veri kümesi dosya tabanlıysa ancak dosya sayısı çalışan sayısından azsa, birInvalidArgumentError
oluşturulacağını unutmayın. Böyle bir durumda, politikayı açıkçaAutoShardPolicy.DATA
olarak ayarlayın veya girdi kaynağınızı, dosya sayısı çalışan sayısından fazla olacak şekilde daha küçük dosyalara bölün. DOSYA: Girdi dosyalarını tüm çalışanlar üzerinde parçalamak istiyorsanız bu seçenek budur. Girdi dosyalarının sayısı çalışan sayısından çok fazlaysa ve dosyalardaki veriler eşit olarak dağıtılmışsa bu seçeneği kullanmalısınız. Bu seçeneğin dezavantajı, dosyalardaki veriler eşit olarak dağıtılmamışsa boşta çalışanlara sahip olmaktır. Dosya sayısı çalışan sayısından azsa, bir
InvalidArgumentError
ortaya çıkar. Bu olursa, politikayı açıkçaAutoShardPolicy.DATA
olarak ayarlayın. Örneğin 2 dosyayı 2 işçiye her biri 1 replika olacak şekilde dağıtalım. Dosya 1 [0, 1, 2, 3, 4, 5] içerir ve Dosya 2 [6, 7, 8, 9, 10, 11] içerir. Eşitlenen toplam çoğaltma sayısı 2 ve genel toplu iş boyutu 4 olsun.- Çalışan 0:
- Parti 1 = Kopya 1: [0, 1]
- Parti 2 = Kopya 1: [2, 3]
- Parti 3 = Kopya 1: [4]
- Parti 4 = Kopya 1: [5]
- işçi 1:
- Parti 1 = Kopya 2: [6, 7]
- Parti 2 = Kopya 2: [8, 9]
- Parti 3 = Kopya 2: [10]
- Parti 4 = Kopya 2: [11]
VERİ: Bu, öğeleri tüm çalışanlar arasında otomatik olarak paylaşacaktır. Çalışanların her biri tüm veri kümesini okuyacak ve yalnızca kendisine atanan parçayı işleyecektir. Diğer tüm parçalar atılacak. Bu genellikle, girdi dosyalarının sayısı çalışan sayısından azsa ve verilerin tüm çalışanlar arasında daha iyi paylaşılmasını istiyorsanız kullanılır. Dezavantajı, tüm veri kümesinin her çalışanda okunacak olmasıdır. Örneğin 1 dosyayı 2 işçiye dağıtalım. Dosya 1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Senkronize edilen toplam replika sayısı 2 olsun.
- Çalışan 0:
- Parti 1 = Kopya 1: [0, 1]
- Parti 2 = Kopya 1: [4, 5]
- Parti 3 = Kopya 1: [8, 9]
- işçi 1:
- Parti 1 = Kopya 2: [2, 3]
- Parti 2 = Kopya 2: [6, 7]
- Parti 3 = Kopya 2: [10, 11]
KAPALI: Otomatik parçalamayı kapatırsanız, her çalışan tüm verileri işler. Örneğin 1 dosyayı 2 işçiye dağıtalım. Dosya 1, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11] içerir. Senkronize edilen toplam replika sayısı 2 olsun. Ardından her çalışan aşağıdaki dağılımı görecektir:
- Çalışan 0:
- Parti 1 = Kopya 1: [0, 1]
- Parti 2 = Kopya 1: [2, 3]
- Parti 3 = Kopya 1: [4, 5]
- Parti 4 = Kopya 1: [6, 7]
- Parti 5 = Kopya 1: [8, 9]
Parti 6 = Kopya 1: [10, 11]
işçi 1:
Parti 1 = Kopya 2: [0, 1]
Parti 2 = Kopya 2: [2, 3]
Parti 3 = Kopya 2: [4, 5]
Parti 4 = Kopya 2: [6, 7]
Parti 5 = Kopya 2: [8, 9]
Parti 6 = Kopya 2: [10, 11]
ön yükleme
Varsayılan olarak, tf.distribute
, kullanıcı tarafından sağlanan tf.data.Dataset
örneğinin sonuna bir önceden getirme dönüşümü ekler. buffer_size
olan önceden getirme dönüştürmesinin bağımsız değişkeni, eşitlemedeki kopyaların sayısına eşittir.
tf.distribute.Strategy.distribute_datasets_from_function
kullanım
Bu API, bir girdi işlevi alır ve bir tf.distribute.DistributedDataset
örneği döndürür. Kullanıcıların ilettiği giriş işlevi bir tf.distribute.InputContext
bağımsız değişkenine sahiptir ve bir tf.data.Dataset
örneği döndürmelidir. Bu API ile tf.distribute
, kullanıcının giriş işlevinden döndürülen tf.data.Dataset
örneğinde başka bir değişiklik yapmaz. Veri kümesini toplu hale getirmek ve parçalamak kullanıcının sorumluluğundadır. tf.distribute
, çalışanların her birinin CPU aygıtındaki giriş işlevini çağırır. Bu API, kullanıcıların kendi toplu işleme ve parçalama mantığını belirlemelerine izin vermenin yanı sıra, çok çalışanlı eğitim için kullanıldığında tf.distribute.Strategy.experimental_distribute_dataset
ile karşılaştırıldığında daha iyi ölçeklenebilirlik ve performans gösterir.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
tutucu8 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Özellikler
yığınlama
Giriş işlevinin dönüş değeri olan tf.data.Dataset
örneği, çoğaltma başına toplu iş boyutu kullanılarak toplu hale getirilmelidir. Çoğaltma başına toplu iş boyutu, genel toplu iş boyutunun eşitleme eğitiminde yer alan çoğaltma sayısına bölümüdür. Bunun nedeni, tf.distribute
her bir çalışanın CPU aygıtındaki giriş işlevini çağırmasıdır. Belirli bir çalışan üzerinde oluşturulan veri kümesi, o çalışandaki tüm kopyalar tarafından kullanıma hazır olmalıdır.
parçalama
Kullanıcının giriş işlevine dolaylı olarak argüman olarak iletilen tf.distribute.InputContext
nesnesi, başlık altında tf.distribute
tarafından oluşturulur. Çalışan sayısı, geçerli çalışan kimliği vb. hakkında bilgilere sahiptir. Bu giriş işlevi, tf.distribute.InputContext
nesnesinin parçası olan bu özellikleri kullanarak kullanıcı tarafından belirlenen ilkelere göre parçalamayı işleyebilir.
ön yükleme
tf.distribute
, kullanıcı tarafından sağlanan giriş işlevi tarafından döndürülen tf.data.Dataset
sonuna bir önceden getirme dönüşümü eklemez.
Dağıtılmış Yineleyiciler
Dağıtılmamış tf.data.Dataset
örneklerine benzer şekilde, üzerinde yineleme yapmak ve tf.distribute.DistributedDataset
içindeki öğelere erişmek için tf.distribute.DistributedDataset
örneklerinde bir yineleyici oluşturmanız gerekir. Aşağıdakiler, bir tf.distribute.DistributedIterator
oluşturabileceğiniz ve modelinizi eğitmek için kullanabileceğiniz yöntemlerdir:
kullanımlar
Döngü yapısı için bir Pythonic kullanın
tf.distribute.DistributedDataset
üzerinde yineleme yapmak için kullanıcı dostu bir Pythonic döngüsü kullanabilirsiniz. tf.distribute.DistributedIterator
öğesinden döndürülen öğeler, tek bir tf.Tensor
veya kopya başına bir değer içeren bir tf.distribute.DistributedValues
olabilir. Döngüyü bir tf.function
içine yerleştirmek bir performans artışı sağlayacaktır. Ancak, break
ve return
şu anda bir tf.function
içine yerleştirilmiş bir tf.distribute.DistributedDataset
üzerindeki bir döngü için desteklenmemektedir.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
tutucu10 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Açık bir iter
oluşturmak için yineleyiciyi kullanın
Bir tf.distribute.DistributedDataset
örneğindeki öğeleri yinelemek için, üzerindeki yineleme iter
kullanarak bir tf.distribute.DistributedIterator
oluşturabilirsiniz. Açık bir yineleyiciyle, sabit sayıda adım için yineleme yapabilirsiniz. Bir tf.distribute.DistributedIterator
örneğinden bir sonraki öğeyi almak için dist_iterator
, next(dist_iterator)
, dist_iterator.get_next()
veya dist_iterator.get_next_as_optional()
. İlk ikisi temelde aynıdır:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
tutucu12 l10n-yerLoss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
next()
veya tf.distribute.DistributedIterator.get_next()
ile, tf.distribute.DistributedIterator
sonuna ulaştıysa, bir OutOfRange hatası atılır. İstemci python tarafında hatayı yakalayabilir ve kontrol noktası ve değerlendirme gibi diğer işleri yapmaya devam edebilir. Ancak, şuna benzeyen bir ana bilgisayar eğitim döngüsü kullanıyorsanız (yani, tf.function
başına birden çok adım çalıştırın) bu çalışmaz:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
, adım gövdesini bir tf.range
içine sararak birden çok adım içerir. Bu durumda, döngüde bağımlılık olmadan farklı yinelemeler paralel olarak başlayabilir, bu nedenle önceki yinelemelerin hesaplanması bitmeden sonraki yinelemelerde bir OutOfRange hatası tetiklenebilir. Bir OutOfRange hatası atıldığında, fonksiyondaki tüm işlemler hemen sonlandırılacaktır. Bu, kaçınmak istediğiniz bir durumsa, OutOfRange hatası oluşturmayan bir alternatif tf.distribute.DistributedIterator.get_next_as_optional()
'dır. get_next_as_optional
, sonraki öğeyi içeren veya tf.distribute.DistributedIterator
sona ulaştıysa hiçbir değeri içermeyen bir tf.experimental.Optional
döndürür.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
tutucu15 l10n-yerWARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
element_spec
özelliğini kullanma
Dağıtılmış bir veri kümesinin öğelerini bir tf.function
ve bir tf.TypeSpec
garantisi istiyorsanız, tf.function
input_signature
bağımsız değişkenini belirtebilirsiniz. Dağıtılmış bir veri kümesinin çıktısı tf.distribute.DistributedValues
ve bu, girişi tek bir cihaza veya birden çok cihaza gösterebilir. Bu dağıtılmış değere karşılık gelen tf.TypeSpec
almak için dağıtılmış veri kümesinin veya dağıtılmış yineleyici nesnesinin element_spec
özelliğini kullanabilirsiniz.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
tutucu17 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Kısmi Partiler
Kullanıcıların oluşturduğu tf.data.Dataset
örnekleri, çoğaltma sayısına eşit olarak bölünemeyen toplu iş boyutları içerebildiğinde veya veri kümesi örneğinin önemliliği toplu iş boyutuna bölünemediğinde kısmi toplu işlerle karşılaşılır. Bu, veri kümesi birden çok kopyaya dağıtıldığında, bazı yineleyicilerdeki next
çağrının bir OutOfRangeError ile sonuçlanacağı anlamına gelir. Bu kullanım durumunu ele almak için tf.distribute
, işlenecek daha fazla verisi olmayan replikalarda 0 toplu iş boyutundaki sahte toplu grupları döndürür.
Tek çalışan durumu için, yineleyicideki bir next
çağrı tarafından veriler döndürülmezse, 0 toplu iş boyutunda sahte partiler oluşturulur ve veri kümesindeki gerçek verilerle birlikte kullanılır. Kısmi gruplar durumunda, son küresel veri grubu, sahte veri gruplarının yanı sıra gerçek verileri içerecektir. Verileri işlemek için durma koşulu artık kopyalardan herhangi birinin veriye sahip olup olmadığını kontrol eder. Replikaların hiçbirinde veri yoksa OutOfRange hatası verilir.
Çoklu çalışan durumu için, çalışanların her birine ilişkin verilerin varlığını temsil eden boole değeri, çapraz çoğaltma iletişimi kullanılarak toplanır ve bu, tüm çalışanların dağıtılmış veri kümesini işlemeyi bitirip bitirmediğini belirlemek için kullanılır. Bu, çalışanlar arası iletişimi içerdiğinden, ilgili bazı performans cezaları vardır.
uyarılar
Birden çok çalışan kurulumuyla
tf.distribute.Strategy.experimental_distribute_dataset
API'lerini kullanırken, kullanıcılar dosyalardan okuyan birtf.data.Dataset
.tf.data.experimental.AutoShardPolicy
,AUTO
veyaFILE
olarak ayarlanırsa, adım başına gerçek toplu iş boyutu, kullanıcı tanımlı genel toplu iş boyutundan daha küçük olabilir. Bu, dosyadaki kalan öğeler genel toplu iş boyutundan küçük olduğunda gerçekleşebilir. Kullanıcılar, çalıştırılacak adım sayısına bağlı olmadan veri kümesini tüketebilir veyatf.data.experimental.AutoShardPolicy
, üzerinde çalışmak içinDATA
olarak ayarlayabilir.Durum bilgisi olan veri kümesi dönüşümleri şu anda
tf.distribute
ile desteklenmemektedir ve veri kümesinin sahip olabileceği durum bilgisi olan işlemler şu anda yok sayılmaktadır. Örneğin, veri kümenizde bir görüntüyü döndürmek içinmap_fn
kullanan birtf.random.uniform
varsa, python işleminin yürütüldüğü yerel makinedeki duruma (yani rastgele tohuma) bağlı bir veri kümesi grafiğiniz olur.Varsayılan olarak devre dışı bırakılan deneysel
tf.data.experimental.OptimizationOptions
belirli bağlamlarda --tf.distribute
ile birlikte kullanıldığında olduğu gibi -- performans düşüşüne neden olabilir. Bunları yalnızca bir dağıtım ayarında iş yükünüzün performansından yararlandıklarını doğruladıktan sonra etkinleştirmelisiniz.Giriş işlem hattınızı genel olarak
tf.data
ile nasıl optimize edeceğinizi öğrenmek için lütfen bu kılavuza bakın. Birkaç ek ipucu:Birden fazla çalışanınız varsa ve bir veya daha fazla glob modeliyle eşleşen tüm dosyalardan bir veri kümesi oluşturmak için
tf.data.Dataset.list_files
kullanıyorsanız, her çalışanın dosyayı tutarlı bir şekilde parçalaması içinseed
bağımsız değişkenini veyashuffle=False
değerini ayarlamayı unutmayın.Giriş işlem hattınız hem verileri kayıt düzeyinde karıştırmayı hem de verileri ayrıştırmayı içeriyorsa, ayrıştırılmamış veriler ayrıştırılan verilerden önemli ölçüde büyük değilse (ki bu genellikle böyle değildir), aşağıdaki örnekte gösterildiği gibi önce karıştırın ve ardından ayrıştırın. Bu, bellek kullanımına ve performansına fayda sağlayabilir.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
buffer_size
öğelerinin dahili bir arabelleğini korur ve bu nedenlebuffer_size
azaltmak OOM sorununu alevlendirebilir.tf.distribute.experimental_distribute_dataset
veyatf.distribute.distribute_datasets_from_function
kullanılırken verilerin işçiler tarafından işlenme sırası garanti edilmez. Tahmini ölçeklendirmek içintf.distribute
kullanıyorsanız bu genellikle gereklidir. Bununla birlikte, partideki her öğe için bir dizin ekleyebilir ve çıktıları buna göre sipariş edebilirsiniz. Aşağıdaki kod parçası, çıktıların nasıl sipariş edileceğine bir örnektir.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
tutucu20 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Kurallı bir tf.data.Dataset örneği kullanmıyorsam verilerimi nasıl dağıtırım?
Bazen kullanıcılar girdilerini temsil etmek için bir tf.data.Dataset
kullanamazlar ve ardından veri setini birden çok cihaza dağıtmak için yukarıda belirtilen API'leri kullanamazlar. Bu gibi durumlarda ham tensörleri veya bir jeneratörden gelen girdileri kullanabilirsiniz.
Rastgele tensör girişleri için deneysel_distribute_values_from_function kullanın
strategy.run
next(iterator)
çıktısı olan tf.distribute.DistributedValues
kabul eder. Tensör değerlerini iletmek için, ham tensörlerden tf.distribute.DistributedValues
oluşturmak için experimental_distribute_values_from_function
kullanın.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
tutucu22 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Girişiniz bir jeneratörden geliyorsa tf.data.Dataset.from_generator kullanın
Kullanmak istediğiniz bir oluşturucu işleviniz varsa, from_generator
API'sini kullanarak bir tf.data.Dataset
örneği oluşturabilirsiniz.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
tutucu24 l10n-yerINFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.