Lihat di TensorFlow.org | Jalankan di Google Colab | Lihat sumber di GitHub | Unduh buku catatan |
API tf.distribute menyediakan cara mudah bagi pengguna untuk menskalakan pelatihan mereka dari satu mesin ke beberapa mesin. Saat menskalakan model mereka, pengguna juga harus mendistribusikan input mereka ke beberapa perangkat. tf.distribute
menyediakan API yang dapat digunakan untuk mendistribusikan input Anda secara otomatis ke seluruh perangkat.
Panduan ini akan menunjukkan kepada Anda berbagai cara untuk membuat kumpulan data dan iterator terdistribusi menggunakan tf.distribute
API. Selain itu, topik-topik berikut akan dibahas:
- Opsi penggunaan, sharding, dan batching saat menggunakan
tf.distribute.Strategy.experimental_distribute_dataset
dantf.distribute.Strategy.distribute_datasets_from_function
. - Berbagai cara di mana Anda dapat mengulangi set data terdistribusi.
- Perbedaan antara
tf.distribute.Strategy.experimental_distribute_dataset
/tf.distribute.Strategy.distribute_datasets_from_function
API dantf.data
API serta batasan yang mungkin ditemui pengguna dalam penggunaannya.
Panduan ini tidak mencakup penggunaan input terdistribusi dengan Keras API.
Kumpulan Data Terdistribusi
Untuk menggunakan tf.distribute
API untuk menskalakan, disarankan agar pengguna menggunakan tf.data.Dataset
untuk mewakili input mereka. tf.distribute
telah dibuat untuk bekerja secara efisien dengan tf.data.Dataset
(misalnya, pengambilan data otomatis ke setiap perangkat akselerator) dengan optimalisasi kinerja yang secara teratur dimasukkan ke dalam implementasi. Jika Anda memiliki kasus penggunaan untuk menggunakan sesuatu selain tf.data.Dataset
, silakan lihat bagian selanjutnya dalam panduan ini. Dalam loop pelatihan yang tidak terdistribusi, pengguna membuat instance tf.data.Dataset
terlebih dahulu, lalu mengulangi elemen tersebut. Sebagai contoh:
import tensorflow as tf
# Helper libraries
import numpy as np
import os
print(tf.__version__)
2.8.0-rc1
global_batch_size = 16
# Create a tf.data.Dataset object.
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
# Iterate over the dataset using the for..in construct.
for inputs in dataset:
print(train_step(inputs))
tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Untuk memungkinkan pengguna menggunakan strategi tf.distribute
dengan sedikit perubahan pada kode pengguna yang ada, dua API diperkenalkan yang akan mendistribusikan instance tf.data.Dataset
dan mengembalikan objek kumpulan data terdistribusi. Pengguna kemudian dapat mengulangi instance set data terdistribusi ini dan melatih model mereka seperti sebelumnya. Sekarang mari kita lihat dua API - tf.distribute.Strategy.experimental_distribute_dataset
dan tf.distribute.Strategy.distribute_datasets_from_function
secara lebih rinci:
tf.distribute.Strategy.experimental_distribute_dataset
Penggunaan
API ini mengambil instance tf.data.Dataset
sebagai input dan mengembalikan instance tf.distribute.DistributedDataset
. Anda harus mengelompokkan kumpulan data input dengan nilai yang sama dengan ukuran kumpulan global. Ukuran batch global ini adalah jumlah sampel yang ingin Anda proses di semua perangkat dalam 1 langkah. Anda dapat mengulangi set data terdistribusi ini dengan cara Pythonic atau membuat iterator menggunakan iter
. Objek yang dikembalikan bukan instance tf.data.Dataset
dan tidak mendukung API lain yang mengubah atau memeriksa set data dengan cara apa pun. Ini adalah API yang direkomendasikan jika Anda tidak memiliki cara khusus untuk membagi input Anda ke replika yang berbeda.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
# Distribute input using the `experimental_distribute_dataset`.
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
# 1 global batch of data fed to the model in 1 step.
print(next(iter(dist_dataset)))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) (<tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>, <tf.Tensor: shape=(16, 1), dtype=float32, numpy= array([[1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.], [1.]], dtype=float32)>) 2022-01-26 05:34:05.342660: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\017TensorDataset:4" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } }
Properti
Batching
tf.distribute
melakukan rebatch instance tf.data.Dataset
input dengan ukuran batch baru yang sama dengan ukuran batch global dibagi dengan jumlah replika yang disinkronkan. Jumlah replika yang disinkronkan sama dengan jumlah perangkat yang mengambil bagian dalam pengurangan gradien selama pelatihan. Saat pengguna memanggil next
pada iterator terdistribusi, ukuran kumpulan data per replika dikembalikan pada setiap replika. Kardinalitas kumpulan data ulang akan selalu menjadi kelipatan dari jumlah replika. Berikut adalah beberapa contoh:
tf.data.Dataset.range(6).batch(4, drop_remainder=False)
- Tanpa distribusi:
- Gelombang 1: [0, 1, 2, 3]
- Gelombang 2: [4, 5]
Dengan distribusi lebih dari 2 replika. Batch terakhir ([4, 5]) dibagi menjadi 2 replika.
Angkatan 1:
- Replika 1[0, 1]
- Replika 2:[2, 3]
Angkatan 2:
- Replika 2: [4]
- Replika 2: [5]
tf.data.Dataset.range(4).batch(4)
- Tanpa distribusi:
- Gelombang 1: [[0], [1], [2], [3]]
- Dengan distribusi lebih dari 5 replika:
- Angkatan 1:
- Replika 1: [0]
- Replika 2: [1]
- Replika 3: [2]
- Replika 4: [3]
- Replika 5: []
tf.data.Dataset.range(8).batch(4)
- Tanpa distribusi:
- Gelombang 1: [0, 1, 2, 3]
- Gelombang 2: [4, 5, 6, 7]
- Dengan distribusi lebih dari 3 replika:
- Angkatan 1:
- Replika 1: [0, 1]
- Replika 2: [2, 3]
- Replika 3: []
- Angkatan 2:
- Replika 1: [4, 5]
- Replika 2: [6, 7]
- Replika 3: []
Rebatching dataset memiliki kompleksitas ruang yang meningkat secara linier dengan jumlah replika. Ini berarti bahwa untuk kasus penggunaan pelatihan multi-pekerja, pipa input dapat mengalami kesalahan OOM.
pecahan
tf.distribute
juga mengotomatiskan dataset input dalam pelatihan multi-pekerja dengan MultiWorkerMirroredStrategy
dan TPUStrategy
. Setiap kumpulan data dibuat pada perangkat CPU pekerja. Autosharding set data di atas sekumpulan pekerja berarti bahwa setiap pekerja diberi subset dari seluruh set data (jika tf.data.experimental.AutoShardPolicy
yang benar disetel). Ini untuk memastikan bahwa pada setiap langkah, ukuran kumpulan global dari elemen kumpulan data yang tidak tumpang tindih akan diproses oleh setiap pekerja. Autosharding memiliki beberapa opsi berbeda yang dapat ditentukan menggunakan tf.data.experimental.DistributeOptions
. Perhatikan bahwa tidak ada autosharding dalam pelatihan multi-pekerja dengan ParameterServerStrategy
, dan informasi lebih lanjut tentang pembuatan kumpulan data dengan strategi ini dapat ditemukan di tutorial Strategi Parameter Server .
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
dataset = dataset.with_options(options)
Ada tiga opsi berbeda yang dapat Anda atur untuk tf.data.experimental.AutoShardPolicy
:
- AUTO: Ini adalah opsi default yang berarti akan dilakukan upaya sharding oleh FILE. Upaya sharding oleh FILE gagal jika kumpulan data berbasis file tidak terdeteksi.
tf.distribute
kemudian akan kembali ke sharding oleh DATA. Perhatikan bahwa jika dataset input berbasis file tetapi jumlah file kurang dari jumlah pekerja,InvalidArgumentError
akan dimunculkan. Jika ini terjadi, setel kebijakan secara eksplisit keAutoShardPolicy.DATA
, atau pisahkan sumber input Anda menjadi file yang lebih kecil sehingga jumlah file lebih besar daripada jumlah pekerja. FILE: Ini adalah opsi jika Anda ingin membagi file input ke semua pekerja. Anda harus menggunakan opsi ini jika jumlah file input jauh lebih besar daripada jumlah pekerja dan data dalam file didistribusikan secara merata. Kelemahan dari opsi ini adalah memiliki pekerja yang menganggur jika data dalam file tidak terdistribusi secara merata. Jika jumlah file kurang dari jumlah pekerja,
InvalidArgumentError
akan dimunculkan. Jika ini terjadi, setel kebijakan secara eksplisit keAutoShardPolicy.DATA
. Misalnya, mari kita distribusikan 2 file ke 2 pekerja dengan masing-masing 1 replika. File 1 berisi [0, 1, 2, 3, 4, 5] dan File 2 berisi [6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2 dan ukuran kumpulan global menjadi 4.- pekerja 0:
- Batch 1 = Replika 1: [0, 1]
- Batch 2 = Replika 1: [2, 3]
- Batch 3 = Replika 1: [4]
- Batch 4 = Replika 1: [5]
- Pekerja 1:
- Batch 1 = Replika 2: [6, 7]
- Batch 2 = Replika 2: [8, 9]
- Batch 3 = Replika 2: [10]
- Batch 4 = Replika 2: [11]
DATA: Ini akan autoshard elemen di semua pekerja. Setiap pekerja akan membaca seluruh kumpulan data dan hanya memproses pecahan yang ditugaskan padanya. Semua pecahan lainnya akan dibuang. Ini umumnya digunakan jika jumlah file input kurang dari jumlah pekerja dan Anda ingin pemisahan data yang lebih baik di semua pekerja. Kelemahannya adalah seluruh dataset akan dibaca pada setiap pekerja. Misalnya, mari kita mendistribusikan 1 file ke 2 pekerja. File 1 berisi [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang sinkron menjadi 2.
- pekerja 0:
- Batch 1 = Replika 1: [0, 1]
- Batch 2 = Replika 1: [4, 5]
- Batch 3 = Replika 1: [8, 9]
- Pekerja 1:
- Batch 1 = Replika 2: [2, 3]
- Batch 2 = Replika 2: [6, 7]
- Batch 3 = Replika 2: [10, 11]
OFF: Jika Anda mematikan autosharding, setiap pekerja akan memproses semua data. Misalnya, mari kita mendistribusikan 1 file ke 2 pekerja. File 1 berisi [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]. Biarkan jumlah total replika yang disinkronkan menjadi 2. Kemudian setiap pekerja akan melihat distribusi berikut:
- pekerja 0:
- Batch 1 = Replika 1: [0, 1]
- Batch 2 = Replika 1: [2, 3]
- Batch 3 = Replika 1: [4, 5]
- Batch 4 = Replika 1: [6, 7]
- Batch 5 = Replika 1: [8, 9]
Batch 6 = Replika 1: [10, 11]
Pekerja 1:
Batch 1 = Replika 2: [0, 1]
Batch 2 = Replika 2: [2, 3]
Batch 3 = Replika 2: [4, 5]
Batch 4 = Replika 2: [6, 7]
Batch 5 = Replika 2: [8, 9]
Batch 6 = Replika 2: [10, 11]
Mengambil terlebih dahulu
Secara default, tf.distribute
menambahkan transformasi prefetch di akhir instance tf.data.Dataset
yang disediakan pengguna. Argumen untuk transformasi prefetch yaitu buffer_size
sama dengan jumlah replika yang disinkronkan.
tf.distribute.Strategy.distribute_datasets_from_function
Penggunaan
API ini mengambil fungsi input dan mengembalikan instance tf.distribute.DistributedDataset
. Fungsi input yang diteruskan pengguna memiliki argumen tf.distribute.InputContext
dan harus mengembalikan instance tf.data.Dataset
. Dengan API ini, tf.distribute
tidak membuat perubahan lebih lanjut pada instance tf.data.Dataset
pengguna yang dikembalikan dari fungsi input. Merupakan tanggung jawab pengguna untuk mengelompokkan dan membagi kumpulan data. tf.distribute
memanggil fungsi input pada perangkat CPU dari masing-masing pekerja. Selain memungkinkan pengguna untuk menentukan logika batching dan sharding mereka sendiri, API ini juga menunjukkan skalabilitas dan kinerja yang lebih baik dibandingkan dengan tf.distribute.Strategy.experimental_distribute_dataset
saat digunakan untuk pelatihan multi-pekerja.
mirrored_strategy = tf.distribute.MirroredStrategy()
def dataset_fn(input_context):
batch_size = input_context.get_per_replica_batch_size(global_batch_size)
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(64).batch(16)
dataset = dataset.shard(
input_context.num_input_pipelines, input_context.input_pipeline_id)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(2) # This prefetches 2 batches per device.
return dataset
dist_dataset = mirrored_strategy.distribute_datasets_from_function(dataset_fn)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',)
Properti
Batching
tf.data.Dataset
yang merupakan nilai kembalian dari fungsi input harus di-batch menggunakan ukuran batch per replika. Ukuran kumpulan per replika adalah ukuran kumpulan global dibagi dengan jumlah replika yang mengikuti pelatihan sinkronisasi. Ini karena tf.distribute
memanggil fungsi input pada perangkat CPU dari masing-masing pekerja. Kumpulan data yang dibuat pada pekerja tertentu harus siap digunakan oleh semua replika pada pekerja tersebut.
pecahan
Objek tf.distribute.InputContext
yang secara implisit diteruskan sebagai argumen ke fungsi input pengguna dibuat oleh tf.distribute
di bawah tenda. Ini memiliki informasi tentang jumlah pekerja, id pekerja saat ini dll. Fungsi input ini dapat menangani sharding sesuai kebijakan yang ditetapkan oleh pengguna menggunakan properti ini yang merupakan bagian dari objek tf.distribute.InputContext
.
Mengambil terlebih dahulu
tf.distribute
tidak menambahkan transformasi prefetch di akhir tf.data.Dataset
yang dikembalikan oleh fungsi input yang disediakan pengguna.
Iterator Terdistribusi
Mirip dengan tf.data.Dataset
yang tidak terdistribusi, Anda perlu membuat iterator pada tf.distribute.DistributedDataset
untuk mengulanginya dan mengakses elemen dalam tf.distribute.DistributedDataset
. Berikut ini adalah cara Anda dapat membuat tf.distribute.DistributedIterator
dan menggunakannya untuk melatih model Anda:
penggunaan
Gunakan konstruksi Pythonic untuk loop
Anda dapat menggunakan loop Pythonic yang mudah digunakan untuk mengulangi tf.distribute.DistributedDataset
. Elemen yang dikembalikan dari tf.distribute.DistributedIterator
dapat berupa tf.Tensor
tunggal atau tf.distribute.DistributedValues
yang berisi nilai per replika. Menempatkan loop di dalam tf.function
akan memberikan peningkatan kinerja. Namun, break
dan return
saat ini tidak didukung untuk loop pada tf.distribute.DistributedDataset
yang ditempatkan di dalam tf.function
.
global_batch_size = 16
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function
def train_step(inputs):
features, labels = inputs
return labels - 0.3 * features
for x in dist_dataset:
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(x,))
print("Loss is ", loss)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:05.431113: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\020TensorDataset:29" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7]], shape=(4, 1), dtype=float32)
Gunakan iter
untuk membuat iterator eksplisit
Untuk mengulangi elemen dalam instance tf.distribute.DistributedDataset
, Anda dapat membuat tf.distribute.DistributedIterator
menggunakan API iter
di dalamnya. Dengan iterator eksplisit, Anda dapat mengulangi sejumlah langkah yang tetap. Untuk mendapatkan elemen berikutnya dari instance tf.distribute.DistributedIterator
dist_iterator
, Anda dapat memanggil next(dist_iterator)
, dist_iterator.get_next()
, atau dist_iterator.get_next_as_optional()
. Dua yang pertama pada dasarnya sama:
num_epochs = 10
steps_per_epoch = 5
for epoch in range(num_epochs):
dist_iterator = iter(dist_dataset)
for step in range(steps_per_epoch):
# train_step trains the model using the dataset elements
loss = mirrored_strategy.run(train_step, args=(next(dist_iterator),))
# which is the same as
# loss = mirrored_strategy.run(train_step, args=(dist_iterator.get_next(),))
print("Loss is ", loss)
Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32) Loss is tf.Tensor( [[0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7] [0.7]], shape=(16, 1), dtype=float32)
Dengan next()
atau tf.distribute.DistributedIterator.get_next()
, jika tf.distribute.DistributedIterator
telah mencapai akhir, kesalahan OutOfRange akan muncul. Klien dapat menangkap kesalahan di sisi python dan terus melakukan pekerjaan lain seperti pos pemeriksaan dan evaluasi. Namun, ini tidak akan berfungsi jika Anda menggunakan loop pelatihan host (yaitu, jalankan beberapa langkah per tf.function
), yang terlihat seperti:
@tf.function
def train_fn(iterator):
for _ in tf.range(steps_per_loop):
strategy.run(step_fn, args=(next(iterator),))
train_fn
berisi beberapa langkah dengan membungkus badan langkah di dalam tf.range
. Dalam hal ini, iterasi yang berbeda dalam loop tanpa ketergantungan dapat dimulai secara paralel, sehingga kesalahan OutOfRange dapat dipicu pada iterasi selanjutnya sebelum perhitungan iterasi sebelumnya selesai. Setelah kesalahan OutOfRange dilemparkan, semua operasi dalam fungsi akan segera dihentikan. Jika ini adalah kasus yang ingin Anda hindari, alternatif yang tidak menimbulkan kesalahan OutOfRange adalah tf.distribute.DistributedIterator.get_next_as_optional()
. get_next_as_optional
mengembalikan tf.experimental.Optional
yang berisi elemen berikutnya atau tidak ada nilai jika tf.distribute.DistributedIterator
telah berakhir.
# You can break the loop with get_next_as_optional by checking if the Optional contains value
global_batch_size = 4
steps_per_loop = 5
strategy = tf.distribute.MirroredStrategy(devices=["GPU:0", "CPU:0"])
dataset = tf.data.Dataset.range(9).batch(global_batch_size)
distributed_iterator = iter(strategy.experimental_distribute_dataset(dataset))
@tf.function
def train_fn(distributed_iterator):
for _ in tf.range(steps_per_loop):
optional_data = distributed_iterator.get_next_as_optional()
if not optional_data.has_value():
break
per_replica_results = strategy.run(lambda x:x, args=(optional_data.get_value(),))
tf.print(strategy.experimental_local_results(per_replica_results))
train_fn(distributed_iterator)
WARNING:tensorflow:There are non-GPU devices in `tf.distribute.Strategy`, not using nccl allreduce. INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0', '/job:localhost/replica:0/task:0/device:CPU:0') 2022-01-26 05:34:07.300202: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_0" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9 } } attr { key: "metadata" value { s: "\n\020RangeDataset:104" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } } 2022-01-26 05:34:07.355301: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations. ([0 1], [2 3]) ([4 5], [6 7]) ([8], [])
Menggunakan properti element_spec
Jika Anda meneruskan elemen set data terdistribusi ke tf.function
dan menginginkan jaminan tf.TypeSpec
, Anda dapat menentukan argumen input_signature
dari tf.function
. Output dari dataset terdistribusi adalah tf.distribute.DistributedValues
yang dapat mewakili input ke satu perangkat atau beberapa perangkat. Untuk mendapatkan tf.TypeSpec
yang sesuai dengan nilai terdistribusi ini, Anda dapat menggunakan properti element_spec
dari dataset terdistribusi atau objek iterator terdistribusi.
global_batch_size = 16
epochs = 5
steps_per_epoch = 5
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset = tf.data.Dataset.from_tensors(([1.],[1.])).repeat(100).batch(global_batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
@tf.function(input_signature=[dist_dataset.element_spec])
def train_step(per_replica_inputs):
def step_fn(inputs):
return 2 * inputs
return mirrored_strategy.run(step_fn, args=(per_replica_inputs,))
for _ in range(epochs):
iterator = iter(dist_dataset)
for _ in range(steps_per_epoch):
output = train_step(next(iterator))
tf.print(output)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:07.611498: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorDataset/_2" op: "TensorDataset" input: "Placeholder/_0" input: "Placeholder/_1" attr { key: "Toutput_types" value { list { type: DT_FLOAT type: DT_FLOAT } } } attr { key: "_cardinality" value { i: 1 } } attr { key: "metadata" value { s: "\n\021TensorDataset:122" } } attr { key: "output_shapes" value { list { shape { dim { size: 1 } } shape { dim { size: 1 } } } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]]) ([[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]], [[1] [1] [1] ... [1] [1] [1]])
Batch Sebagian
Batch sebagian ditemui saat instance tf.data.Dataset
yang dibuat pengguna mungkin berisi ukuran batch yang tidak dapat dibagi secara merata dengan jumlah replika atau ketika kardinalitas instance dataset tidak dapat dibagi dengan ukuran batch. Ini berarti bahwa ketika kumpulan data didistribusikan melalui beberapa replika, panggilan next
pada beberapa iterator akan menghasilkan OutOfRangeError. Untuk menangani kasus penggunaan ini, tf.distribute
mengembalikan kumpulan dummy ukuran kumpulan 0 pada replika yang tidak memiliki data lagi untuk diproses.
Untuk kasus pekerja tunggal, jika data tidak dikembalikan oleh panggilan next
pada iterator, kumpulan dummy dengan ukuran kumpulan 0 dibuat dan digunakan bersama dengan data nyata dalam kumpulan data. Dalam kasus kumpulan parsial, kumpulan data global terakhir akan berisi data nyata bersama kumpulan data dummy. Kondisi berhenti untuk memproses data sekarang memeriksa apakah ada replika yang memiliki data. Jika tidak ada data pada salah satu replika, kesalahan OutOfRange akan muncul.
Untuk kasus multi pekerja, nilai boolean yang mewakili keberadaan data pada setiap pekerja dikumpulkan menggunakan komunikasi replika silang dan ini digunakan untuk mengidentifikasi apakah semua pekerja telah selesai memproses kumpulan data terdistribusi. Karena ini melibatkan komunikasi lintas pekerja, ada beberapa hukuman kinerja yang terlibat.
Peringatan
Saat menggunakan
tf.distribute.Strategy.experimental_distribute_dataset
API dengan beberapa pengaturan pekerja, pengguna meneruskantf.data.Dataset
yang membaca dari file. Jikatf.data.experimental.AutoShardPolicy
diatur keAUTO
atauFILE
, ukuran batch aktual per langkah mungkin lebih kecil dari ukuran batch global yang ditentukan pengguna. Ini dapat terjadi jika elemen yang tersisa dalam file lebih kecil dari ukuran batch global. Pengguna dapat menghabiskan kumpulan data tanpa bergantung pada jumlah langkah yang harus dijalankan atau menyeteltf.data.experimental.AutoShardPolicy
keDATA
untuk mengatasinya.Transformasi set data stateful saat ini tidak didukung dengan
tf.distribute
dan operasi stateful apa pun yang mungkin dimiliki set data saat ini diabaikan. Misalnya, jika kumpulan data Anda memilikimap_fn
yang menggunakantf.random.uniform
untuk memutar gambar, maka Anda memiliki grafik kumpulan data yang bergantung pada status (yaitu benih acak) pada mesin lokal tempat proses python dijalankan.Eksperimental
tf.data.experimental.OptimizationOptions
yang dinonaktifkan secara default dapat dalam konteks tertentu -- seperti bila digunakan bersama dengantf.distribute
-- menyebabkan penurunan kinerja. Anda hanya boleh mengaktifkannya setelah Anda memvalidasi bahwa mereka menguntungkan kinerja beban kerja Anda dalam pengaturan distribusi.Silakan merujuk ke panduan ini untuk cara mengoptimalkan saluran input Anda dengan
tf.data
secara umum. Beberapa tips tambahan:Jika Anda memiliki beberapa pekerja dan menggunakan
tf.data.Dataset.list_files
untuk membuat kumpulan data dari semua file yang cocok dengan satu atau beberapa pola glob, ingatlah untuk menyetel argumenseed
atau setelshuffle=False
sehingga setiap pekerja melakukan shard file secara konsisten.Jika saluran input Anda mencakup pengacakan data pada tingkat rekaman dan penguraian data, kecuali jika data yang tidak diuraikan secara signifikan lebih besar dari data yang diuraikan (yang biasanya tidak demikian), acak terlebih dahulu lalu parsing, seperti yang ditunjukkan dalam contoh berikut. Ini dapat menguntungkan penggunaan dan kinerja memori.
d = tf.data.Dataset.list_files(pattern, shuffle=False)
d = d.shard(num_workers, worker_index)
d = d.repeat(num_epochs)
d = d.shuffle(shuffle_buffer_size)
d = d.interleave(tf.data.TFRecordDataset,
cycle_length=num_readers, block_length=1)
d = d.map(parser_fn, num_parallel_calls=num_map_threads)
tf.data.Dataset.shuffle(buffer_size, seed=None, reshuffle_each_iteration=None)
mempertahankan buffer internal elemenbuffer_size
, dan dengan demikian mengurangibuffer_size
dapat mengurangi masalah OOM.Urutan pemrosesan data oleh pekerja saat menggunakan
tf.distribute.experimental_distribute_dataset
atautf.distribute.distribute_datasets_from_function
tidak dijamin. Ini biasanya diperlukan jika Anda menggunakantf.distribute
untuk menskalakan prediksi. Namun Anda dapat memasukkan indeks untuk setiap elemen dalam batch dan memesan output yang sesuai. Cuplikan berikut adalah contoh cara mengurutkan output.
mirrored_strategy = tf.distribute.MirroredStrategy()
dataset_size = 24
batch_size = 6
dataset = tf.data.Dataset.range(dataset_size).enumerate().batch(batch_size)
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
def predict(index, inputs):
outputs = 2 * inputs
return index, outputs
result = {}
for index, inputs in dist_dataset:
output_index, outputs = mirrored_strategy.run(predict, args=(index, inputs))
indices = list(mirrored_strategy.experimental_local_results(output_index))
rindices = []
for a in indices:
rindices.extend(a.numpy())
outputs = list(mirrored_strategy.experimental_local_results(outputs))
routputs = []
for a in outputs:
routputs.extend(a.numpy())
for i, value in zip(rindices, routputs):
result[i] = value
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18, 10: 20, 11: 22, 12: 24, 13: 26, 14: 28, 15: 30, 16: 32, 17: 34, 18: 36, 19: 38, 20: 40, 21: 42, 22: 44, 23: 46} 2022-01-26 05:34:08.978884: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "RangeDataset/_3" op: "RangeDataset" input: "Const/_4" input: "Const/_1" input: "Const/_2" attr { key: "_cardinality" value { i: 9223372036854775807 } } attr { key: "metadata" value { s: "\n\020RangeDataset:162" } } attr { key: "output_shapes" value { list { shape { } } } } attr { key: "output_types" value { list { type: DT_INT64 } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_INT64 } } } } }
Bagaimana cara mendistribusikan data saya jika saya tidak menggunakan instans tf.data.Dataset kanonik?
Terkadang pengguna tidak dapat menggunakan tf.data.Dataset
untuk mewakili input mereka dan selanjutnya API yang disebutkan di atas untuk mendistribusikan dataset ke beberapa perangkat. Dalam kasus seperti itu, Anda dapat menggunakan tensor mentah atau input dari generator.
Gunakan eksperimental_distribute_values_from_function untuk input tensor arbitrer
strategy.run
menerima tf.distribute.DistributedValues
yang merupakan output dari next(iterator)
. Untuk meneruskan nilai tensor, gunakan experimental_distribute_values_from_function
untuk membuat tf.distribute.DistributedValues
dari tensor mentah.
mirrored_strategy = tf.distribute.MirroredStrategy()
worker_devices = mirrored_strategy.extended.worker_devices
def value_fn(ctx):
return tf.constant(1.0)
distributed_values = mirrored_strategy.experimental_distribute_values_from_function(value_fn)
for _ in range(4):
result = mirrored_strategy.run(lambda x:x, args=(distributed_values,))
print(result)
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) WARNING:tensorflow:Using MirroredStrategy eagerly has significant overhead currently. We will be working on improving this in the future, but for now please wrap `call_for_each_replica` or `experimental_run` or `run` inside a tf.function to get the best performance. tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32) tf.Tensor(1.0, shape=(), dtype=float32)
Gunakan tf.data.Dataset.from_generator jika input Anda berasal dari generator
Jika Anda memiliki fungsi generator yang ingin digunakan, Anda dapat membuat instance tf.data.Dataset
menggunakan from_generator
API.
mirrored_strategy = tf.distribute.MirroredStrategy()
def input_gen():
while True:
yield np.random.rand(4)
# use Dataset.from_generator
dataset = tf.data.Dataset.from_generator(
input_gen, output_types=(tf.float32), output_shapes=tf.TensorShape([4]))
dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)
iterator = iter(dist_dataset)
for _ in range(4):
mirrored_strategy.run(lambda x:x, args=(next(iterator),))
INFO:tensorflow:Using MirroredStrategy with devices ('/job:localhost/replica:0/task:0/device:GPU:0',) 2022-01-26 05:34:09.091386: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:776] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Did not find a shardable source, walked to a node which is not a dataset: name: "FlatMapDataset/_2" op: "FlatMapDataset" input: "TensorDataset/_1" attr { key: "Targuments" value { list { } } } attr { key: "_cardinality" value { i: -2 } } attr { key: "f" value { func { name: "__inference_Dataset_flat_map_flat_map_fn_3980" } } } attr { key: "metadata" value { s: "\n\022FlatMapDataset:178" } } attr { key: "output_shapes" value { list { shape { dim { size: 4 } } } } } attr { key: "output_types" value { list { type: DT_FLOAT } } } experimental_type { type_id: TFT_PRODUCT args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } args { type_id: TFT_DATASET args { type_id: TFT_PRODUCT args { type_id: TFT_TENSOR args { type_id: TFT_FLOAT } } } } } . Consider either turning off auto-sharding or switching the auto_shard_policy to DATA to shard this dataset. You can do this by creating a new `tf.data.Options()` object then setting `options.experimental_distribute.auto_shard_policy = AutoShardPolicy.DATA` before applying the options object to the dataset via `dataset.with_options(options)`.