Zobacz na TensorFlow.org | Uruchom w Google Colab | Wyświetl źródło na GitHub | Pobierz notatnik |
Interfejsy API tf.distribute zapewniają użytkownikom łatwy sposób skalowania szkolenia z jednego komputera na wiele komputerów. Podczas skalowania modelu użytkownicy muszą również rozdzielić swoje dane wejściowe na wiele urządzeń. tf.distribute
udostępnia interfejsy API, za pomocą których możesz automatycznie dystrybuować dane wejściowe na urządzenia.
Ten przewodnik pokaże Ci różne sposoby tworzenia rozproszonych zbiorów danych i iteratorów przy użyciu interfejsów API tf.distribute
. Dodatkowo omówione zostaną następujące tematy:
- Opcje użycia, shardingu i tworzenia partii podczas korzystania z
tf.distribute.Strategy.experimental_distribute_dataset
itf.distribute.Strategy.distribute_datasets_from_function
. - Różne sposoby iteracji w rozproszonym zestawie danych.
- Różnice między
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API itf.data
API, a także wszelkie ograniczenia, na jakie użytkownicy mogą natknąć się podczas ich użytkowania.
Ten przewodnik nie obejmuje korzystania z rozproszonych danych wejściowych z interfejsami API Keras.
Rozproszone zbiory danych
Aby używać tf.distribute
API do skalowania, zaleca się, aby użytkownicy używali tf.data.Dataset
do reprezentowania ich danych wejściowych. tf.distribute
został stworzony do wydajnej pracy z tf.data.Dataset
(na przykład automatyczne pobieranie danych do każdego urządzenia akceleratora), a optymalizacja wydajności jest regularnie włączana do implementacji. Jeśli masz przypadek użycia czegoś innego niż tf.data.Dataset
, zapoznaj się z kolejną sekcją tego przewodnika. W nierozproszonej pętli szkoleniowej użytkownicy najpierw tworzą instancję tf.data.Dataset
, a następnie iterują elementy. Na przykład:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Aby umożliwić użytkownikom korzystanie ze strategii tf.distribute
z minimalnymi zmianami w istniejącym kodzie użytkownika, wprowadzono dwa interfejsy API, które rozpowszechniają instancję tf.data.Dataset
i zwracają rozproszony obiekt zestawu danych. Użytkownik może następnie iterować tę instancję rozproszonego zestawu danych i trenować swój model tak jak poprzednio. Przyjrzyjmy się teraz dwóm interfejsom API — tf.distribute.Strategy.experimental_distribute_dataset
i tf.distribute.Strategy.distribute_datasets_from_function
:
tf.distribute.Strategy.experimental_distribute_dataset
Stosowanie
Ten interfejs API przyjmuje instancję tf.data.Dataset
jako dane wejściowe i zwraca instancję tf.distribute.DistributedDataset
. Wejściowy zestaw danych należy wsadowo o wartości równej globalnemu rozmiarowi wsadu. Ten globalny rozmiar partii to liczba próbek, które chcesz przetworzyć na wszystkich urządzeniach w jednym kroku. Możesz iterować po tym rozproszonym zbiorze danych w sposób Pythona lub utworzyć iterator za pomocą iter
. Zwrócony obiekt nie jest instancją tf.data.Dataset
i nie obsługuje żadnych innych interfejsów API, które w jakikolwiek sposób przekształcają lub sprawdzają zestaw danych. Jest to zalecany interfejs API, jeśli nie masz konkretnych sposobów dzielenia danych wejściowych na różne repliki.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Nieruchomości
Dozowanie
tf.distribute
ponownie przetwarza wsadowe wystąpienie tf.data.Dataset
z nowym rozmiarem wsadu równym globalnemu rozmiarowi wsadu podzielonemu przez liczbę zsynchronizowanych replik. Liczba zsynchronizowanych replik jest równa liczbie urządzeń biorących udział w gradiencie, który zmniejsza się podczas treningu. Gdy użytkownik wywołuje next
w iteratorze rozproszonym, w każdej replice zwracany jest rozmiar partii danych na replikę. Kardynalność zestawu danych ze zmienioną partią zawsze będzie wielokrotnością liczby replik. Oto kilka przykładów:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Bez dystrybucji:
- Partia 1: [0, 1, 2, 3]
- Partia 2: [4, 5]
Z dystrybucją ponad 2 replik. Ostatnia partia ([4, 5]) zostaje podzielona na 2 repliki.
Partia 1:
- Replika 1:[0, 1]
- Replika 2:[2, 3]
Partia 2:
- Replika 2: [4]
- Replika 2: [5]
tf.data.Dataset.range(4).batch(4)
- Bez dystrybucji:
- Partia 1: [[0], [1], [2], [3]]
- Z dystrybucją ponad 5 replik:
- Partia 1:
- Replika 1: [0]
- Replika 2: [1]
- Replika 3: [2]
- Replika 4: [3]
- Replika 5: []
tf.data.Dataset.range(8).batch(4)
- Bez dystrybucji:
- Partia 1: [0, 1, 2, 3]
- Partia 2: [4, 5, 6, 7]
- Z dystrybucją ponad 3 replik:
- Partia 1:
- Replika 1: [0, 1]
- Replika 2: [2, 3]
- Replika 3: []
- Partia 2:
- Replika 1: [4, 5]
- Replika 2: [6, 7]
- Replika 3: []
Ponowne grupowanie zestawu danych ma złożoność przestrzeni, która rośnie liniowo wraz z liczbą replik. Oznacza to, że w przypadku użycia szkolenia z wieloma pracownikami potok wejściowy może napotkać błędy OOM.
Fragmentacja
tf.distribute
również automatycznie sharduje wejściowy zestaw danych w szkoleniu dla wielu pracowników za pomocą MultiWorkerMirroredStrategy
i TPUStrategy
. Każdy zestaw danych jest tworzony na urządzeniu procesora pracownika. Automatyczne fragmentowanie zestawu danych przez zestaw pracowników oznacza, że każdemu pracownikowi przypisywany jest podzbiór całego zestawu danych (jeśli ustawiono właściwy tf.data.experimental.AutoShardPolicy
). Ma to na celu zapewnienie, że na każdym kroku każdy pracownik przetwarza globalny rozmiar partii nienakładających się elementów zestawu danych. Autosharding ma kilka różnych opcji, które można określić przy użyciu tf.data.experimental.DistributeOptions
. Należy zauważyć, że nie ma autoshardingu w przypadku uczenia wielu pracowników za pomocą ParameterServerStrategy
, a więcej informacji na temat tworzenia zestawów danych za pomocą tej strategii można znaleźć w samouczku dotyczącym strategii serwera parametrów .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Istnieją trzy różne opcje, które można ustawić dla tf.data.experimental.AutoShardPolicy
:
- AUTO: Jest to opcja domyślna, co oznacza, że zostanie podjęta próba shardowania przez FILE. Próba fragmentowania przez FILE kończy się niepowodzeniem, jeśli nie zostanie wykryty zestaw danych oparty na pliku.
tf.distribute
powróci do shardingu przez DATA. Należy zauważyć, że jeśli wejściowy zestaw danych jest oparty na plikach, ale liczba plików jest mniejsza niż liczba procesów roboczych, zostanie zgłoszonyInvalidArgumentError
. W takim przypadku jawnie ustaw zasady naAutoShardPolicy.DATA
lub podziel źródło danych wejściowych na mniejsze pliki, tak aby liczba plików była większa niż liczba procesów roboczych. PLIK: Jest to opcja, jeśli chcesz podzielić pliki wejściowe na wszystkich pracowników. Należy użyć tej opcji, jeśli liczba plików wejściowych jest znacznie większa niż liczba procesów roboczych, a dane w plikach są równomiernie rozłożone. Wadą tej opcji jest bezczynność pracowników, jeśli dane w plikach nie są równomiernie rozłożone. Jeśli liczba plików jest mniejsza niż liczba pracowników, zostanie zgłoszony
InvalidArgumentError
. W takim przypadku jawnie ustaw zasady naAutoShardPolicy.DATA
. Na przykład roześlijmy 2 pliki na 2 procesy robocze z 1 repliką każdy. Plik 1 zawiera [0, 1, 2, 3, 4, 5], a Plik 2 zawiera [6, 7, 8, 9, 10, 11]. Niech łączna liczba zsynchronizowanych replik wynosi 2, a globalny rozmiar partii wynosi 4.- Pracownik 0:
- Partia 1 = Replika 1: [0, 1]
- Partia 2 = Replika 1: [2, 3]
- Partia 3 = Replika 1: [4]
- Partia 4 = Replika 1: [5]
- Pracownik 1:
- Partia 1 = Replika 2: [6, 7]
- Partia 2 = Replika 2: [8, 9]
- Partia 3 = Replika 2: [10]
- Partia 4 = Replika 2: [11]
DANE: Spowoduje to automatyczne shardowanie elementów we wszystkich robotnikach. Każdy z pracowników odczyta cały zbiór danych i przetworzy tylko przypisany do niego fragment. Wszystkie inne odłamki zostaną odrzucone. Jest to zwykle używane, jeśli liczba plików wejściowych jest mniejsza niż liczba pracowników i chcesz lepiej podzielić dane na fragmenty na wszystkich pracowników. Minusem jest to, że na każdym pracowniku zostanie odczytany cały zestaw danych. Na przykład rozdzielmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech łączna liczba zsynchronizowanych replik wynosi 2.
- Pracownik 0:
- Partia 1 = Replika 1: [0, 1]
- Partia 2 = Replika 1: [4, 5]
- Partia 3 = Replika 1: [8, 9]
- Pracownik 1:
- Partia 1 = Replika 2: [2, 3]
- Partia 2 = Replika 2: [6, 7]
- Partia 3 = Replika 2: [10, 11]
WYŁ: Jeśli wyłączysz autosharding, każdy pracownik będzie przetwarzał wszystkie dane. Na przykład rozdzielmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech łączna liczba zsynchronizowanych replik wynosi 2. Wtedy każdy pracownik zobaczy następujący rozkład:
- Pracownik 0:
- Partia 1 = Replika 1: [0, 1]
- Partia 2 = Replika 1: [2, 3]
- Partia 3 = Replika 1: [4, 5]
- Partia 4 = Replika 1: [6, 7]
- Partia 5 = Replika 1: [8, 9]
Partia 6 = Replika 1: [10, 11]
Pracownik 1:
Partia 1 = Replika 2: [0, 1]
Partia 2 = Replika 2: [2, 3]
Partia 3 = Replika 2: [4, 5]
Partia 4 = Replika 2: [6, 7]
Partia 5 = Replika 2: [8, 9]
Partia 6 = Replika 2: [10, 11]
Pobieranie z wyprzedzeniem
Domyślnie tf.distribute
dodaje transformację pobierania wstępnego na końcu dostarczonej przez użytkownika instancji tf.data.Dataset
. Argumentem transformacji pobierania wstępnego, którym jest buffer_size
, jest liczba zsynchronizowanych replik.
tf.distribute.Strategy.distribute_datasets_from_function
Stosowanie
Ten interfejs API przyjmuje funkcję wejściową i zwraca instancję tf.distribute.DistributedDataset
. Funkcja wejściowa przekazywana przez użytkowników ma argument tf.distribute.InputContext
i powinna zwracać instancję tf.data.Dataset
. Za pomocą tego interfejsu API tf.distribute
nie wprowadza żadnych dalszych zmian w instancji tf.data.Dataset
użytkownika zwróconej przez funkcję input. Obowiązkiem użytkownika jest wsadowe i fragmentowanie zestawu danych. tf.distribute
wywołuje funkcję wejściową na urządzeniu procesora każdego z pracowników. Oprócz umożliwienia użytkownikom określania własnej logiki przetwarzania wsadowego i fragmentowania, ten interfejs API wykazuje również lepszą skalowalność i wydajność w porównaniu z tf.distribute.Strategy.experimental_distribute_dataset
, gdy jest używany do uczenia wielu procesów roboczych.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Nieruchomości
Dozowanie
tf.data.Dataset
, które jest wartością zwracaną przez funkcję wejściową, powinno być przetwarzane wsadowo przy użyciu rozmiaru partii na replikę. Wielkość partii na replikę to globalny rozmiar partii podzielony przez liczbę replik, które biorą udział w szkoleniu synchronizacji. Dzieje się tak, ponieważ tf.distribute
wywołuje funkcję wejściową na urządzeniu CPU każdego z pracowników. Zestaw danych utworzony na danym procesie roboczym powinien być gotowy do użycia przez wszystkie repliki w tym procesie roboczym.
Fragmentacja
Obiekt tf.distribute.InputContext
, który jest niejawnie przekazywany jako argument do funkcji wejściowej użytkownika, jest tworzony przez tf.distribute
pod maską. Zawiera informacje o liczbie pracowników, bieżącym identyfikatorze pracownika itp. Ta funkcja wejściowa może obsługiwać sharding zgodnie z zasadami ustawionymi przez użytkownika przy użyciu tych właściwości, które są częścią obiektu tf.distribute.InputContext
.
Pobieranie z wyprzedzeniem
tf.distribute
nie dodaje przekształcenia wstępnego pobierania na końcu zestawu tf.data.Dataset
zwróconego przez funkcję wejściową podaną przez użytkownika.
Iteratory rozproszone
Podobnie jak w przypadku tf.data.Dataset
instancji tf.data.Dataset, będziesz musiał utworzyć iterator na instancjach tf.distribute.DistributedDataset
, aby wykonać iterację i uzyskać dostęp do elementów w tf.distribute.DistributedDataset
. Poniżej przedstawiono sposoby tworzenia tf.distribute.DistributedIterator
i używania ich do trenowania modelu:
zwyczaje
Użyj Pythonowej konstrukcji pętli
Możesz użyć przyjaznej dla użytkownika pętli Pythona do iteracji po tf.distribute.DistributedDataset
. Elementy zwrócone z tf.distribute.DistributedIterator
mogą być pojedynczym tf.Tensor
lub tf.distribute.DistributedValues
, który zawiera wartość na replikę. Umieszczenie pętli wewnątrz funkcji tf.function
. spowoduje zwiększenie wydajności. Jednak break
i return
nie są obecnie obsługiwane w przypadku pętli nad tf.distribute.DistributedDataset
umieszczonym wewnątrz tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Użyj iter
, aby utworzyć wyraźny iterator
Aby wykonać iterację po elementach w wystąpieniu tf.distribute.DistributedDataset
, możesz utworzyć tf.distribute.DistributedIterator
za pomocą interfejsu API iter
. Za pomocą jawnego iteratora możesz iterować przez ustaloną liczbę kroków. Aby pobrać następny element z instancji tf.distribute.DistributedIterator
dist_iterator
, możesz wywołać next(dist_iterator)
, dist_iterator.get_next()
lub dist_iterator.get_next_as_optional()
. Pierwsze dwa są zasadniczo takie same:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
W przypadku next()
lub tf.distribute.DistributedIterator.get_next()
, jeśli tf.distribute.DistributedIterator
osiągnął swój koniec, zostanie zgłoszony błąd OutOfRange. Klient może wykryć błąd po stronie Pythona i kontynuować inne prace, takie jak sprawdzanie punktów i ocena. Jednak to nie zadziała, jeśli używasz pętli treningowej hosta (tj. Uruchom wiele kroków na tf.function
), która wygląda tak:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
zawiera wiele kroków poprzez zawinięcie treści kroku w tf.range
. W takim przypadku różne iteracje w pętli bez zależności mogą rozpocząć się równolegle, więc błąd OutOfRange może zostać wyzwolony w późniejszych iteracjach przed zakończeniem obliczeń poprzednich iteracji. Po zgłoszeniu błędu OutOfRange wszystkie operacje w funkcji zostaną natychmiast zakończone. Jeśli jest to przypadek, którego chcesz uniknąć, alternatywą, która nie powoduje błędu OutOfRange, jest tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
zwraca tf.experimental.Optional
, który zawiera następny element lub nie zawiera żadnej wartości, jeśli tf.distribute.DistributedIterator
osiągnął koniec.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Korzystanie z właściwości element_spec
Jeśli przekazujesz elementy rozproszonego zestawu danych do tf.function
i potrzebujesz gwarancji tf.TypeSpec
, możesz określić argument input_signature
funkcji tf.function
. Dane wyjściowe rozproszonego zestawu danych to tf.distribute.DistributedValues
, które mogą reprezentować dane wejściowe dla pojedynczego urządzenia lub wielu urządzeń. Aby uzyskać tf.TypeSpec
odpowiadające tej wartości rozproszonej, można użyć właściwości element_spec
rozproszonego zestawu danych lub obiektu rozproszonego iteratora.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Partie częściowe
Partie częściowe są napotykane, gdy instancje tf.data.Dataset
przez użytkowników mogą zawierać rozmiary partii, które nie są równomiernie podzielne przez liczbę replik lub gdy liczność wystąpienia zestawu danych nie jest podzielna przez rozmiar partii. Oznacza to, że gdy zestaw danych jest rozproszony w wielu replikach, next
wywołanie niektórych iteratorów spowoduje wystąpienie błędu OutOfRangeError. Aby obsłużyć ten przypadek użycia, tf.distribute
zwraca fikcyjne partie o rozmiarze partii 0 w replikach, które nie mają więcej danych do przetworzenia.
W przypadku pojedynczego procesu roboczego, jeśli dane nie zostaną zwrócone przez next
wywołanie iteratora, tworzone są fikcyjne partie o wielkości partii 0 i używane wraz z rzeczywistymi danymi w zestawie danych. W przypadku partii częściowych, ostatnia globalna partia danych będzie zawierać dane rzeczywiste obok fikcyjnych partii danych. Warunek zatrzymania przetwarzania danych sprawdza teraz, czy którakolwiek z replik zawiera dane. Jeśli nie ma danych w żadnej z replik, zostanie zgłoszony błąd OutOfRange.
W przypadku wielu procesów roboczych wartość logiczna reprezentująca obecność danych na każdym z procesów roboczych jest agregowana przy użyciu komunikacji między replikami i jest używana do określenia, czy wszyscy pracownicy zakończyli przetwarzanie rozproszonego zestawu danych. Ponieważ wiąże się to z komunikacją między pracownikami, wiąże się to z pewnym spadkiem wydajności.
Zastrzeżenia
Korzystając z interfejsów API
tf.distribute.Strategy.experimental_distribute_dataset
z konfiguracją wielu procesów roboczych, użytkownicy przekazujątf.data.Dataset
, który odczytuje pliki. Jeślitf.data.experimental.AutoShardPolicy
jest ustawiona naAUTO
lubFILE
, rzeczywisty rozmiar partii na krok może być mniejszy niż globalny rozmiar partii zdefiniowany przez użytkownika. Może się tak zdarzyć, gdy pozostałe elementy w pliku są mniejsze niż globalny rozmiar wsadu. Użytkownicy mogą wyczerpać zestaw danych bez zależności od liczby kroków do uruchomienia lub ustawićtf.data.experimental.AutoShardPolicy
naDATA
, aby go obejść.Przekształcenia stanowego zestawu danych nie są obecnie obsługiwane przez
tf.distribute
, a wszelkie operacje stanowe, które może mieć zestaw danych, są obecnie ignorowane. Na przykład, jeśli twój zestaw danych mamap_fn
, który używatf.random.uniform
do obracania obrazu, to masz wykres zestawu danych, który zależy od stanu (tj. losowego zalążka) na komputerze lokalnym, na którym wykonywany jest proces Pythona.Eksperymentalna
tf.data.experimental.OptimizationOptions
, które są domyślnie wyłączone, mogą w niektórych kontekstach — na przykład w przypadku użycia razem ztf.distribute
— spowodować pogorszenie wydajności. Należy je włączyć dopiero po sprawdzeniu, czy zwiększają wydajność obciążenia w ustawieniu dystrybucji.Zapoznaj się z tym przewodnikiem , aby dowiedzieć się, jak ogólnie zoptymalizować potok danych wejściowych za pomocą
tf.data
. Kilka dodatkowych wskazówek:Jeśli masz wiele procesów roboczych i używasz
tf.data.Dataset.list_files
do tworzenia zestawu danych ze wszystkich plików pasujących do jednego lub większej liczby wzorców globalnych, pamiętaj o ustawieniu argumentuseed
lub ustawieniushuffle=False
, aby każdy proces roboczy konsekwentnie dzielił plik.Jeśli potok wejściowy obejmuje zarówno tasowanie danych na poziomie rekordu, jak i analizowanie danych, chyba że nieprzeanalizowane dane są znacznie większe niż przeanalizowane dane (co zwykle nie ma miejsca), najpierw przetasuj, a następnie przeanalizuj, jak pokazano w poniższym przykładzie. Może to korzystnie wpłynąć na wykorzystanie pamięci i wydajność.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
utrzymuj wewnętrzny bufor elementówbuffer_size
, a tym samym zmniejszaniebuffer_size
może złagodzić problem OOM.Nie jest gwarantowana kolejność przetwarzania danych przez pracowników podczas korzystania z
tf.distribute.experimental_distribute_dataset
lubtf.distribute.distribute_datasets_from_function
. Jest to zwykle wymagane, jeśli używasztf.distribute
do przewidywania skali. Można jednak wstawić indeks dla każdego elementu w partii i odpowiednio uporządkować dane wyjściowe. Poniższy fragment kodu przedstawia przykład porządkowania wyników.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Jak rozpowszechniać moje dane, jeśli nie używam kanonicznej instancji tf.data.Dataset?
Czasami użytkownicy nie mogą używać tf.data.Dataset
do reprezentowania swoich danych wejściowych, a następnie wspomnianych wyżej interfejsów API do dystrybucji zestawu danych na wiele urządzeń. W takich przypadkach możesz użyć surowych tensorów lub wejść z generatora.
Użyj eksperymentalnej_wartości_rozpowszechniania_od_funkcji dla dowolnych danych wejściowych tensora
strategy.run
akceptuje tf.distribute.DistributedValues
, który jest wynikiem działania next(iterator)
. Aby przekazać wartości tensorów, użyj experimental_distribute_values_from_function
do skonstruowania tf.distribute.DistributedValues
z surowych tensorów.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Użyj tf.data.Dataset.from_generator, jeśli dane wejściowe pochodzą z generatora
Jeśli masz funkcję generatora, której chcesz użyć, możesz utworzyć instancję tf.data.Dataset
za pomocą interfejsu API from_generator
.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.