Wejście rozproszone

Zobacz na TensorFlow.org Uruchom w Google Colab Wyświetl źródło na GitHub Pobierz notatnik

Interfejsy API tf.distribute zapewniają użytkownikom łatwy sposób skalowania szkolenia z jednego komputera na wiele komputerów. Podczas skalowania modelu użytkownicy muszą również rozdzielić swoje dane wejściowe na wiele urządzeń. tf.distribute udostępnia interfejsy API, za pomocą których możesz automatycznie dystrybuować dane wejściowe na urządzenia.

Ten przewodnik pokaże Ci różne sposoby tworzenia rozproszonych zbiorów danych i iteratorów przy użyciu interfejsów API tf.distribute . Dodatkowo omówione zostaną następujące tematy:

Ten przewodnik nie obejmuje korzystania z rozproszonych danych wejściowych z interfejsami API Keras.

Rozproszone zbiory danych

Aby używać tf.distribute API do skalowania, zaleca się, aby użytkownicy używali tf.data.Dataset do reprezentowania ich danych wejściowych. tf.distribute został stworzony do wydajnej pracy z tf.data.Dataset (na przykład automatyczne pobieranie danych do każdego urządzenia akceleratora), a optymalizacja wydajności jest regularnie włączana do implementacji. Jeśli masz przypadek użycia czegoś innego niż tf.data.Dataset , zapoznaj się z kolejną sekcją tego przewodnika. W nierozproszonej pętli szkoleniowej użytkownicy najpierw tworzą instancję tf.data.Dataset , a następnie iterują elementy. Na przykład:

import tensorflow as tf

# Helper libraries
import numpy as np
import os

print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

# Iterate over the dataset using the for..in construct.
for inputs in dataset:
  print(train_step(inputs))
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Aby umożliwić użytkownikom korzystanie ze strategii tf.distribute z minimalnymi zmianami w istniejącym kodzie użytkownika, wprowadzono dwa interfejsy API, które rozpowszechniają instancję tf.data.Dataset i zwracają rozproszony obiekt zestawu danych. Użytkownik może następnie iterować tę instancję rozproszonego zestawu danych i trenować swój model tak jak poprzednio. Przyjrzyjmy się teraz dwóm interfejsom API — tf.distribute.Strategy.experimental_distribute_dataset i tf.distribute.Strategy.distribute_datasets_from_function :

tf.distribute.Strategy.experimental_distribute_dataset

Stosowanie

Ten interfejs API przyjmuje instancję tf.data.Dataset jako dane wejściowe i zwraca instancję tf.distribute.DistributedDataset . Wejściowy zestaw danych należy wsadowo o wartości równej globalnemu rozmiarowi wsadu. Ten globalny rozmiar partii to liczba próbek, które chcesz przetworzyć na wszystkich urządzeniach w jednym kroku. Możesz iterować po tym rozproszonym zbiorze danych w sposób Pythona lub utworzyć iterator za pomocą iter . Zwrócony obiekt nie jest instancją tf.data.Dataset i nie obsługuje żadnych innych interfejsów API, które w jakikolwiek sposób przekształcają lub sprawdzają zestaw danych. Jest to zalecany interfejs API, jeśli nie masz konkretnych sposobów dzielenia danych wejściowych na różne repliki.

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
(<tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy=
array([[1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.],
       [1.]], dtype=float32)>)
2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\017TensorDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}

Nieruchomości

Dozowanie

tf.distribute ponownie przetwarza wsadowe wystąpienie tf.data.Dataset z nowym rozmiarem wsadu równym globalnemu rozmiarowi wsadu podzielonemu przez liczbę zsynchronizowanych replik. Liczba zsynchronizowanych replik jest równa liczbie urządzeń biorących udział w gradiencie, który zmniejsza się podczas treningu. Gdy użytkownik wywołuje next w iteratorze rozproszonym, w każdej replice zwracany jest rozmiar partii danych na replikę. Kardynalność zestawu danych ze zmienioną partią zawsze będzie wielokrotnością liczby replik. Oto kilka przykładów:

  • tf.data.Dataset.range(6).batch(4, drop_remainder=False)

    • Bez dystrybucji:
    • Partia 1: [0, 1, 2, 3]
    • Partia 2: [4, 5]
    • Z dystrybucją ponad 2 replik. Ostatnia partia ([4, 5]) zostaje podzielona na 2 repliki.

    • Partia 1:

      • Replika 1:[0, 1]
      • Replika 2:[2, 3]
    • Partia 2:

      • Replika 2: [4]
      • Replika 2: [5]
  • tf.data.Dataset.range(4).batch(4)

    • Bez dystrybucji:
    • Partia 1: [[0], [1], [2], [3]]
    • Z dystrybucją ponad 5 replik:
    • Partia 1:
      • Replika 1: [0]
      • Replika 2: [1]
      • Replika 3: [2]
      • Replika 4: [3]
      • Replika 5: []
  • tf.data.Dataset.range(8).batch(4)

    • Bez dystrybucji:
    • Partia 1: [0, 1, 2, 3]
    • Partia 2: [4, 5, 6, 7]
    • Z dystrybucją ponad 3 replik:
    • Partia 1:
      • Replika 1: [0, 1]
      • Replika 2: [2, 3]
      • Replika 3: []
    • Partia 2:
      • Replika 1: [4, 5]
      • Replika 2: [6, 7]
      • Replika 3: []

Ponowne grupowanie zestawu danych ma złożoność przestrzeni, która rośnie liniowo wraz z liczbą replik. Oznacza to, że w przypadku użycia szkolenia z wieloma pracownikami potok wejściowy może napotkać błędy OOM.

Fragmentacja

tf.distribute również automatycznie sharduje wejściowy zestaw danych w szkoleniu dla wielu pracowników za pomocą MultiWorkerMirroredStrategy i TPUStrategy . Każdy zestaw danych jest tworzony na urządzeniu procesora pracownika. Automatyczne fragmentowanie zestawu danych przez zestaw pracowników oznacza, że ​​każdemu pracownikowi przypisywany jest podzbiór całego zestawu danych (jeśli ustawiono właściwy tf.data.experimental.AutoShardPolicy ). Ma to na celu zapewnienie, że na każdym kroku każdy pracownik przetwarza globalny rozmiar partii nienakładających się elementów zestawu danych. Autosharding ma kilka różnych opcji, które można określić przy użyciu tf.data.experimental.DistributeOptions . Należy zauważyć, że nie ma autoshardingu w przypadku uczenia wielu pracowników za pomocą ParameterServerStrategy , a więcej informacji na temat tworzenia zestawów danych za pomocą tej strategii można znaleźć w samouczku dotyczącym strategii serwera parametrów .

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)

Istnieją trzy różne opcje, które można ustawić dla tf.data.experimental.AutoShardPolicy :

  • AUTO: Jest to opcja domyślna, co oznacza, że ​​zostanie podjęta próba shardowania przez FILE. Próba fragmentowania przez FILE kończy się niepowodzeniem, jeśli nie zostanie wykryty zestaw danych oparty na pliku. tf.distribute powróci do shardingu przez DATA. Należy zauważyć, że jeśli wejściowy zestaw danych jest oparty na plikach, ale liczba plików jest mniejsza niż liczba procesów roboczych, zostanie zgłoszony InvalidArgumentError . W takim przypadku jawnie ustaw zasady na AutoShardPolicy.DATA lub podziel źródło danych wejściowych na mniejsze pliki, tak aby liczba plików była większa niż liczba procesów roboczych.
  • PLIK: Jest to opcja, jeśli chcesz podzielić pliki wejściowe na wszystkich pracowników. Należy użyć tej opcji, jeśli liczba plików wejściowych jest znacznie większa niż liczba procesów roboczych, a dane w plikach są równomiernie rozłożone. Wadą tej opcji jest bezczynność pracowników, jeśli dane w plikach nie są równomiernie rozłożone. Jeśli liczba plików jest mniejsza niż liczba pracowników, zostanie zgłoszony InvalidArgumentError . W takim przypadku jawnie ustaw zasady na AutoShardPolicy.DATA . Na przykład roześlijmy 2 pliki na 2 procesy robocze z 1 repliką każdy. Plik 1 zawiera [0, 1, 2, 3, 4, 5], a Plik 2 zawiera [6, 7, 8, 9, 10, 11]. Niech łączna liczba zsynchronizowanych replik wynosi 2, a globalny rozmiar partii wynosi 4.

    • Pracownik 0:
    • Partia 1 = Replika 1: [0, 1]
    • Partia 2 = Replika 1: [2, 3]
    • Partia 3 = Replika 1: [4]
    • Partia 4 = Replika 1: [5]
    • Pracownik 1:
    • Partia 1 = Replika 2: [6, 7]
    • Partia 2 = Replika 2: [8, 9]
    • Partia 3 = Replika 2: [10]
    • Partia 4 = Replika 2: [11]
  • DANE: Spowoduje to automatyczne shardowanie elementów we wszystkich robotnikach. Każdy z pracowników odczyta cały zbiór danych i przetworzy tylko przypisany do niego fragment. Wszystkie inne odłamki zostaną odrzucone. Jest to zwykle używane, jeśli liczba plików wejściowych jest mniejsza niż liczba pracowników i chcesz lepiej podzielić dane na fragmenty na wszystkich pracowników. Minusem jest to, że na każdym pracowniku zostanie odczytany cały zestaw danych. Na przykład rozdzielmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech łączna liczba zsynchronizowanych replik wynosi 2.

    • Pracownik 0:
    • Partia 1 = Replika 1: [0, 1]
    • Partia 2 = Replika 1: [4, 5]
    • Partia 3 = Replika 1: [8, 9]
    • Pracownik 1:
    • Partia 1 = Replika 2: [2, 3]
    • Partia 2 = Replika 2: [6, 7]
    • Partia 3 = Replika 2: [10, 11]
  • WYŁ: Jeśli wyłączysz autosharding, każdy pracownik będzie przetwarzał wszystkie dane. Na przykład rozdzielmy 1 plik na 2 pracowników. Plik 1 zawiera [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Niech łączna liczba zsynchronizowanych replik wynosi 2. Wtedy każdy pracownik zobaczy następujący rozkład:

    • Pracownik 0:
    • Partia 1 = Replika 1: [0, 1]
    • Partia 2 = Replika 1: [2, 3]
    • Partia 3 = Replika 1: [4, 5]
    • Partia 4 = Replika 1: [6, 7]
    • Partia 5 = Replika 1: [8, 9]
    • Partia 6 = Replika 1: [10, 11]

    • Pracownik 1:

    • Partia 1 = Replika 2: [0, 1]

    • Partia 2 = Replika 2: [2, 3]

    • Partia 3 = Replika 2: [4, 5]

    • Partia 4 = Replika 2: [6, 7]

    • Partia 5 = Replika 2: [8, 9]

    • Partia 6 = Replika 2: [10, 11]

Pobieranie z wyprzedzeniem

Domyślnie tf.distribute dodaje transformację pobierania wstępnego na końcu dostarczonej przez użytkownika instancji tf.data.Dataset . Argumentem transformacji pobierania wstępnego, którym jest buffer_size , jest liczba zsynchronizowanych replik.

tf.distribute.Strategy.distribute_datasets_from_function

Stosowanie

Ten interfejs API przyjmuje funkcję wejściową i zwraca instancję tf.distribute.DistributedDataset . Funkcja wejściowa przekazywana przez użytkowników ma argument tf.distribute.InputContext i powinna zwracać instancję tf.data.Dataset . Za pomocą tego interfejsu API tf.distribute nie wprowadza żadnych dalszych zmian w instancji tf.data.Dataset użytkownika zwróconej przez funkcję input. Obowiązkiem użytkownika jest wsadowe i fragmentowanie zestawu danych. tf.distribute wywołuje funkcję wejściową na urządzeniu procesora każdego z pracowników. Oprócz umożliwienia użytkownikom określania własnej logiki przetwarzania wsadowego i fragmentowania, ten interfejs API wykazuje również lepszą skalowalność i wydajność w porównaniu z tf.distribute.Strategy.experimental_distribute_dataset , gdy jest używany do uczenia wielu procesów roboczych.

mirrored_strategy = tf.distribute.MirroredStrategy()

def dataset_fn(input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
  dataset = dataset.shard(
    input_context.num_input_pipelines, input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
  return dataset

dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)

Nieruchomości

Dozowanie

tf.data.Dataset , które jest wartością zwracaną przez funkcję wejściową, powinno być przetwarzane wsadowo przy użyciu rozmiaru partii na replikę. Wielkość partii na replikę to globalny rozmiar partii podzielony przez liczbę replik, które biorą udział w szkoleniu synchronizacji. Dzieje się tak, ponieważ tf.distribute wywołuje funkcję wejściową na urządzeniu CPU każdego z pracowników. Zestaw danych utworzony na danym procesie roboczym powinien być gotowy do użycia przez wszystkie repliki w tym procesie roboczym.

Fragmentacja

Obiekt tf.distribute.InputContext , który jest niejawnie przekazywany jako argument do funkcji wejściowej użytkownika, jest tworzony przez tf.distribute pod maską. Zawiera informacje o liczbie pracowników, bieżącym identyfikatorze pracownika itp. Ta funkcja wejściowa może obsługiwać sharding zgodnie z zasadami ustawionymi przez użytkownika przy użyciu tych właściwości, które są częścią obiektu tf.distribute.InputContext .

Pobieranie z wyprzedzeniem

tf.distribute nie dodaje przekształcenia wstępnego pobierania na końcu zestawu tf.data.Dataset zwróconego przez funkcję wejściową podaną przez użytkownika.

Iteratory rozproszone

Podobnie jak w przypadku tf.data.Dataset instancji tf.data.Dataset, będziesz musiał utworzyć iterator na instancjach tf.distribute.DistributedDataset , aby wykonać iterację i uzyskać dostęp do elementów w tf.distribute.DistributedDataset . Poniżej przedstawiono sposoby tworzenia tf.distribute.DistributedIterator i używania ich do trenowania modelu:

zwyczaje

Użyj Pythonowej konstrukcji pętli

Możesz użyć przyjaznej dla użytkownika pętli Pythona do iteracji po tf.distribute.DistributedDataset . Elementy zwrócone z tf.distribute.DistributedIterator mogą być pojedynczym tf.Tensor lub tf.distribute.DistributedValues , który zawiera wartość na replikę. Umieszczenie pętli wewnątrz funkcji tf.function . spowoduje zwiększenie wydajności. Jednak break i return nie są obecnie obsługiwane w przypadku pętli nad tf.distribute.DistributedDataset umieszczonym wewnątrz tf.function .

global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function
def train_step(inputs):
  features, labels = inputs
  return labels - 0.3 * features

for x in dist_dataset:
  # train_step trains the model using the dataset elements
  loss = mirrored_strategy.run(train_step, args=(x,))
  print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020TensorDataset:29"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(4, 1), dtype=float32)

Użyj iter , aby utworzyć wyraźny iterator

Aby wykonać iterację po elementach w wystąpieniu tf.distribute.DistributedDataset , możesz utworzyć tf.distribute.DistributedIterator za pomocą interfejsu API iter . Za pomocą jawnego iteratora możesz iterować przez ustaloną liczbę kroków. Aby pobrać następny element z instancji tf.distribute.DistributedIterator dist_iterator , możesz wywołać next(dist_iterator) , dist_iterator.get_next() lub dist_iterator.get_next_as_optional() . Pierwsze dwa są zasadniczo takie same:

num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
  dist_iterator = iter(dist_dataset)
  for step in range(steps_per_epoch):
    # train_step trains the model using the dataset elements
    loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
    # which is the same as
    # loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
    print("Loss is ", loss)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)
Loss is  tf.Tensor(
[[0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]
 [0.7]], shape=(16, 1), dtype=float32)

W przypadku next() lub tf.distribute.DistributedIterator.get_next() , jeśli tf.distribute.DistributedIterator osiągnął swój koniec, zostanie zgłoszony błąd OutOfRange. Klient może wykryć błąd po stronie Pythona i kontynuować inne prace, takie jak sprawdzanie punktów i ocena. Jednak to nie zadziała, jeśli używasz pętli treningowej hosta (tj. Uruchom wiele kroków na tf.function ), która wygląda tak:

@tf.function
def train_fn(iterator):
  for _ in tf.range(steps_per_loop):
    strategy.run(step_fn, args=(next(iterator),))

train_fn zawiera wiele kroków poprzez zawinięcie treści kroku w tf.range . W takim przypadku różne iteracje w pętli bez zależności mogą rozpocząć się równolegle, więc błąd OutOfRange może zostać wyzwolony w późniejszych iteracjach przed zakończeniem obliczeń poprzednich iteracji. Po zgłoszeniu błędu OutOfRange wszystkie operacje w funkcji zostaną natychmiast zakończone. Jeśli jest to przypadek, którego chcesz uniknąć, alternatywą, która nie powoduje błędu OutOfRange, jest tf.distribute.DistributedIterator.get_next_as_optional() . get_next_as_optional zwraca tf.experimental.Optional , który zawiera następny element lub nie zawiera żadnej wartości, jeśli tf.distribute.DistributedIterator osiągnął koniec.

# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])

dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))

@tf.function
def train_fn(distributed_iterator):
  for _ in tf.range(steps_per_loop):
    optional_data = distributed_iterator.get_next_as_optional()
    if not optional_data.has_value():
      break
    per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
    tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce.
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0')
2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_0"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:104"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
([0 1], [2 3])
([4 5], [6 7])
([8], [])

Korzystanie z właściwości element_spec

Jeśli przekazujesz elementy rozproszonego zestawu danych do tf.function i potrzebujesz gwarancji tf.TypeSpec , możesz określić argument input_signature funkcji tf.function . Dane wyjściowe rozproszonego zestawu danych to tf.distribute.DistributedValues , które mogą reprezentować dane wejściowe dla pojedynczego urządzenia lub wielu urządzeń. Aby uzyskać tf.TypeSpec odpowiadające tej wartości rozproszonej, można użyć właściwości element_spec rozproszonego zestawu danych lub obiektu rozproszonego iteratora.

global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()

dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
  def step_fn(inputs):
    return 2 * inputs

  return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))

for _ in range(epochs):
  iterator = iter(dist_dataset)
  for _ in range(steps_per_epoch):
    output = train_step(next(iterator))
    tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2"
op: "TensorDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 1
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\021TensorDataset:122"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 1
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])
([[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]], [[1]
 [1]
 [1]
 ...
 [1]
 [1]
 [1]])

Partie częściowe

Partie częściowe są napotykane, gdy instancje tf.data.Dataset przez użytkowników mogą zawierać rozmiary partii, które nie są równomiernie podzielne przez liczbę replik lub gdy liczność wystąpienia zestawu danych nie jest podzielna przez rozmiar partii. Oznacza to, że gdy zestaw danych jest rozproszony w wielu replikach, next wywołanie niektórych iteratorów spowoduje wystąpienie błędu OutOfRangeError. Aby obsłużyć ten przypadek użycia, tf.distribute zwraca fikcyjne partie o rozmiarze partii 0 w replikach, które nie mają więcej danych do przetworzenia.

W przypadku pojedynczego procesu roboczego, jeśli dane nie zostaną zwrócone przez next wywołanie iteratora, tworzone są fikcyjne partie o wielkości partii 0 i używane wraz z rzeczywistymi danymi w zestawie danych. W przypadku partii częściowych, ostatnia globalna partia danych będzie zawierać dane rzeczywiste obok fikcyjnych partii danych. Warunek zatrzymania przetwarzania danych sprawdza teraz, czy którakolwiek z replik zawiera dane. Jeśli nie ma danych w żadnej z replik, zostanie zgłoszony błąd OutOfRange.

W przypadku wielu procesów roboczych wartość logiczna reprezentująca obecność danych na każdym z procesów roboczych jest agregowana przy użyciu komunikacji między replikami i jest używana do określenia, czy wszyscy pracownicy zakończyli przetwarzanie rozproszonego zestawu danych. Ponieważ wiąże się to z komunikacją między pracownikami, wiąże się to z pewnym spadkiem wydajności.

Zastrzeżenia

  • Korzystając z interfejsów API tf.distribute.Strategy.experimental_distribute_dataset z konfiguracją wielu procesów roboczych, użytkownicy przekazują tf.data.Dataset , który odczytuje pliki. Jeśli tf.data.experimental.AutoShardPolicy jest ustawiona na AUTO lub FILE , rzeczywisty rozmiar partii na krok może być mniejszy niż globalny rozmiar partii zdefiniowany przez użytkownika. Może się tak zdarzyć, gdy pozostałe elementy w pliku są mniejsze niż globalny rozmiar wsadu. Użytkownicy mogą wyczerpać zestaw danych bez zależności od liczby kroków do uruchomienia lub ustawić tf.data.experimental.AutoShardPolicy na DATA , aby go obejść.

  • Przekształcenia stanowego zestawu danych nie są obecnie obsługiwane przez tf.distribute , a wszelkie operacje stanowe, które może mieć zestaw danych, są obecnie ignorowane. Na przykład, jeśli twój zestaw danych ma map_fn , który używa tf.random.uniform do obracania obrazu, to masz wykres zestawu danych, który zależy od stanu (tj. losowego zalążka) na komputerze lokalnym, na którym wykonywany jest proces Pythona.

  • Eksperymentalna tf.data.experimental.OptimizationOptions , które są domyślnie wyłączone, mogą w niektórych kontekstach — na przykład w przypadku użycia razem z tf.distribute — spowodować pogorszenie wydajności. Należy je włączyć dopiero po sprawdzeniu, czy zwiększają wydajność obciążenia w ustawieniu dystrybucji.

  • Zapoznaj się z tym przewodnikiem , aby dowiedzieć się, jak ogólnie zoptymalizować potok danych wejściowych za pomocą tf.data . Kilka dodatkowych wskazówek:

    • Jeśli masz wiele procesów roboczych i używasz tf.data.Dataset.list_files do tworzenia zestawu danych ze wszystkich plików pasujących do jednego lub większej liczby wzorców globalnych, pamiętaj o ustawieniu argumentu seed lub ustawieniu shuffle=False , aby każdy proces roboczy konsekwentnie dzielił plik.

    • Jeśli potok wejściowy obejmuje zarówno tasowanie danych na poziomie rekordu, jak i analizowanie danych, chyba że nieprzeanalizowane dane są znacznie większe niż przeanalizowane dane (co zwykle nie ma miejsca), najpierw przetasuj, a następnie przeanalizuj, jak pokazano w poniższym przykładzie. Może to korzystnie wpłynąć na wykorzystanie pamięci i wydajność.

d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
  • tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None) utrzymuj wewnętrzny bufor elementów buffer_size , a tym samym zmniejszanie buffer_size może złagodzić problem OOM.

  • Nie jest gwarantowana kolejność przetwarzania danych przez pracowników podczas korzystania z tf.distribute.experimental_distribute_dataset lub tf.distribute.distribute_datasets_from_function . Jest to zwykle wymagane, jeśli używasz tf.distribute do przewidywania skali. Można jednak wstawić indeks dla każdego elementu w partii i odpowiednio uporządkować dane wyjściowe. Poniższy fragment kodu przedstawia przykład porządkowania wyników.

mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

def predict(index, inputs):
  outputs = 2 * inputs
  return index, outputs

result = {}
for index, inputs in dist_dataset:
  output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
  indices = list(mirrored_strategy.experimental_local_results(output_index))
  rindices = []
  for a in indices:
    rindices.extend(a.numpy())
  outputs = list(mirrored_strategy.experimental_local_results(outputs))
  routputs = []
  for a in outputs:
    routputs.extend(a.numpy())
  for i, value in zip(rindices, routputs):
    result[i] = value

print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
{0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46}
2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3"
op: "RangeDataset"
input: "Const/_4"
input: "Const/_1"
input: "Const/_2"
attr {
  key: "_cardinality"
  value {
    i: 9223372036854775807
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\020RangeDataset:162"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_INT64
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_INT64
        }
      }
    }
  }
}

Jak rozpowszechniać moje dane, jeśli nie używam kanonicznej instancji tf.data.Dataset?

Czasami użytkownicy nie mogą używać tf.data.Dataset do reprezentowania swoich danych wejściowych, a następnie wspomnianych wyżej interfejsów API do dystrybucji zestawu danych na wiele urządzeń. W takich przypadkach możesz użyć surowych tensorów lub wejść z generatora.

Użyj eksperymentalnej_wartości_rozpowszechniania_od_funkcji dla dowolnych danych wejściowych tensora

strategy.run akceptuje tf.distribute.DistributedValues , który jest wynikiem działania next(iterator) . Aby przekazać wartości tensorów, użyj experimental_distribute_values_from_function do skonstruowania tf.distribute.DistributedValues z surowych tensorów.

mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices

def value_fn(ctx):
  return tf.constant(1.0)

distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
  result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
  print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance.
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)
tf.Tensor(1.0, shape=(), dtype=float32)

Użyj tf.data.Dataset.from_generator, jeśli dane wejściowe pochodzą z generatora

Jeśli masz funkcję generatora, której chcesz użyć, możesz utworzyć instancję tf.data.Dataset za pomocą interfejsu API from_generator .

mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
  while True:
    yield np.random.rand(4)

# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
    input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
  mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2"
op: "FlatMapDataset"
input: "TensorDataset/_1"
attr {
  key: "Targuments"
  value {
    list {
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: -2
  }
}
attr {
  key: "f"
  value {
    func {
      name: "__inference_Dataset_flat_map_flat_map_fn_3980"
    }
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\022FlatMapDataset:178"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 4
        }
      }
    }
  }
}
attr {
  key: "output_types"
  value {
    list {
      type: DT_FLOAT
    }
  }
}
experimental_type {
  type_id: TFT_PRODUCT
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
  args {
    type_id: TFT_DATASET
    args {
      type_id: TFT_PRODUCT
      args {
        type_id: TFT_TENSOR
        args {
          type_id: TFT_FLOAT
        }
      }
    }
  }
}
. Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.