از TPU ها استفاده کنید

مشاهده در TensorFlow.org در Google Colab اجرا شود مشاهده منبع در GitHub دانلود دفترچه یادداشت

قبل از اجرای این نوت بوک Colab، با بررسی تنظیمات نوت بوک، مطمئن شوید که شتاب دهنده سخت افزاری شما TPU است: زمان اجرا > تغییر نوع زمان اجرا > شتاب دهنده سخت افزار > TPU .

برپایی

import tensorflow as tf

import os
import tensorflow_datasets as tfds
/tmpfs/src/tf_docs_env/lib/python3.6/site-packages/requests/__init__.py:104: RequestsDependencyWarning: urllib3 (1.26.8) or chardet (2.3.0)/charset_normalizer (2.0.11) doesn't match a supported version!
  RequestsDependencyWarning)

مقداردهی اولیه TPU

TPU ها معمولاً کارگران Cloud TPU هستند که با فرآیند محلی اجرای برنامه پایتون کاربر متفاوت هستند. بنابراین، برای اتصال به خوشه راه دور و مقداردهی اولیه TPU ها باید مقداری کار اولیه را انجام دهید. توجه داشته باشید که آرگومان tpu به tf.distribute.cluster_resolver.TPUClusterResolver یک آدرس خاص فقط برای Colab است. اگر کد خود را در موتور محاسباتی Google (GCE) اجرا می‌کنید، در عوض باید نام Cloud TPU خود را ارسال کنید.

resolver = tf.distribute.cluster_resolver.TPUClusterResolver(tpu='')
tf.config.experimental_connect_to_cluster(resolver)
# This is the TPU initialization code that has to be at the beginning.
tf.tpu.experimental.initialize_tpu_system(resolver)
print("All devices: ", tf.config.list_logical_devices('TPU'))
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Clearing out eager caches
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470
INFO:tensorflow:Initializing the TPU system: grpc://10.240.1.10:8470
INFO:tensorflow:Finished initializing TPU system.
INFO:tensorflow:Finished initializing TPU system.
All devices:  [LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:0', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:1', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:2', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:3', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:4', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:5', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:6', device_type='TPU'), LogicalDevice(name='/job:worker/replica:0/task:0/device:TPU:7', device_type='TPU')]

قرار دادن دستگاه به صورت دستی

پس از مقداردهی اولیه TPU، می توانید از قرار دادن دستی دستگاه برای قرار دادن محاسبات روی یک دستگاه TPU استفاده کنید:

a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

with tf.device('/TPU:0'):
  c = tf.matmul(a, b)

print("c device: ", c.device)
print(c)
c device:  /job:worker/replica:0/task:0/device:TPU:0
tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)

استراتژی های توزیع

معمولاً شما مدل خود را روی چندین TPU به صورت موازی داده اجرا می کنید. برای توزیع مدل خود بر روی چندین TPU (یا سایر شتاب دهنده ها)، TensorFlow چندین استراتژی توزیع ارائه می دهد. شما می توانید استراتژی توزیع خود را جایگزین کنید و مدل بر روی هر دستگاه (TPU) مشخصی اجرا می شود. برای اطلاعات بیشتر راهنمای استراتژی توزیع را بررسی کنید.

برای نشان دادن این موضوع، یک شی tf.distribute.TPUStrategy ایجاد کنید:

strategy = tf.distribute.TPUStrategy(resolver)
INFO:tensorflow:Found TPU system:
INFO:tensorflow:Found TPU system:
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Cores: 8
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Workers: 1
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Num TPU Cores Per Worker: 8
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:localhost/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:CPU:0, CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:0, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:1, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:2, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:3, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:4, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:5, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:6, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU:7, TPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:TPU_SYSTEM:0, TPU_SYSTEM, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)
INFO:tensorflow:*** Available Device: _DeviceAttributes(/job:worker/replica:0/task:0/device:XLA_CPU:0, XLA_CPU, 0, 0)

برای تکرار یک محاسبات به طوری که بتواند در تمام هسته های TPU اجرا شود، می توانید آن را به API strategy.run .run منتقل کنید. در زیر نمونه‌ای وجود دارد که نشان می‌دهد همه هسته‌ها ورودی‌های یکسان (a, b) را دریافت می‌کنند و به طور مستقل ضرب ماتریس را روی هر هسته انجام می‌دهند. خروجی ها مقادیر همه کپی ها خواهند بود.

@tf.function
def matmul_fn(x, y):
  z = tf.matmul(x, y)
  return z

z = strategy.run(matmul_fn, args=(a, b))
print(z)
PerReplica:{
  0: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  1: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  2: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  3: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  4: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  5: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  6: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32),
  7: tf.Tensor(
[[22. 28.]
 [49. 64.]], shape=(2, 2), dtype=float32)
}

طبقه بندی در TPU ها

پس از پوشش مفاهیم اساسی، یک مثال ملموس تر را در نظر بگیرید. این بخش نحوه استفاده از استراتژی توزیع - tf.distribute.TPUStrategy - برای آموزش یک مدل Keras در یک TPU ابری را نشان می‌دهد.

مدل کراس را تعریف کنید

با تعریف یک مدل Keras Sequential برای طبقه بندی تصویر در مجموعه داده MNIST با استفاده از Keras شروع کنید. تفاوتی با آنچه که در صورت آموزش روی CPU یا GPU استفاده می کردید، ندارد. توجه داشته باشید که ایجاد مدل Keras باید در داخل strategy.scope باشد، بنابراین متغیرها را می توان در هر دستگاه TPU ایجاد کرد. سایر بخش‌های کد لازم نیست در محدوده استراتژی قرار گیرند.

def create_model():
  return tf.keras.Sequential(
      [tf.keras.layers.Conv2D(256, 3, activation='relu', input_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(256, 3, activation='relu'),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dense(256, activation='relu'),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)])

مجموعه داده را بارگیری کنید

استفاده کارآمد از tf.data.Dataset API هنگام استفاده از Cloud TPU بسیار مهم است، زیرا استفاده از TPU های Cloud غیرممکن است مگر اینکه بتوانید به اندازه کافی سریع داده ها را به آنها بدهید. می‌توانید در راهنمای عملکرد خط لوله ورودی اطلاعات بیشتری درباره عملکرد مجموعه داده کسب کنید.

برای همه آزمایش‌ها به جز ساده‌ترین آزمایش‌ها (با استفاده از tf.data.Dataset.from_tensor_slices یا سایر داده‌های درون نمودار)، باید همه فایل‌های داده‌ای که توسط مجموعه داده خوانده می‌شود را در سطل‌های Google Cloud Storage (GCS) ذخیره کنید.

برای بیشتر موارد استفاده، توصیه می شود داده های خود را به فرمت TFRecord تبدیل کنید و برای خواندن آن از یک tf.data.TFRecordDataset استفاده کنید. برای جزئیات نحوه انجام این کار، آموزش TFRecord و tf.Example را بررسی کنید. این یک نیاز سخت نیست و می توانید از سایر خوانندگان مجموعه داده مانند tf.data.FixedLengthRecordDataset یا tf.data.TextLineDataset استفاده کنید.

می توانید کل مجموعه داده های کوچک را با استفاده از tf.data.Dataset.cache در حافظه بارگذاری کنید.

صرف نظر از فرمت داده استفاده شده، اکیداً توصیه می شود از فایل های حجیم در حد 100 مگابایت استفاده کنید. این به ویژه در این تنظیمات شبکه ای مهم است، زیرا هزینه باز کردن یک فایل به طور قابل توجهی بالاتر است.

همانطور که در کد زیر نشان داده شده است، باید از ماژول tensorflow_datasets برای دریافت یک کپی از داده های آموزشی و آزمایشی MNIST استفاده کنید. توجه داشته باشید که try_gcs برای استفاده از یک کپی مشخص شده است که در یک سطل GCS عمومی موجود است. اگر این مورد را مشخص نکنید، TPU نمی تواند به داده های دانلود شده دسترسی پیدا کند.

def get_dataset(batch_size, is_training=True):
  split = 'train' if is_training else 'test'
  dataset, info = tfds.load(name='mnist', split=split, with_info=True,
                            as_supervised=True, try_gcs=True)

  # Normalize the input data.
  def scale(image, label):
    image = tf.cast(image, tf.float32)
    image /= 255.0
    return image, label

  dataset = dataset.map(scale)

  # Only shuffle and repeat the dataset in training. The advantage of having an
  # infinite dataset for training is to avoid the potential last partial batch
  # in each epoch, so that you don't need to think about scaling the gradients
  # based on the actual batch size.
  if is_training:
    dataset = dataset.shuffle(10000)
    dataset = dataset.repeat()

  dataset = dataset.batch(batch_size)

  return dataset

مدل را با استفاده از APIهای سطح بالای Keras آموزش دهید

می توانید مدل خود را با Keras fit و API های compile کنید. در این مرحله هیچ چیز خاص TPU وجود ندارد—شما کد را طوری می نویسید که گویی از چندین GPU و یک MirroredStrategy به جای TPUStrategy استفاده می کنید. می توانید در آموزش Distributed with Keras بیشتر بدانید.

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

batch_size = 200
steps_per_epoch = 60000 // batch_size
validation_steps = 10000 // batch_size

train_dataset = get_dataset(batch_size, is_training=True)
test_dataset = get_dataset(batch_size, is_training=False)

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset, 
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 18s 32ms/step - loss: 0.1433 - sparse_categorical_accuracy: 0.9564 - val_loss: 0.0452 - val_sparse_categorical_accuracy: 0.9859
Epoch 2/5
300/300 [==============================] - 6s 21ms/step - loss: 0.0335 - sparse_categorical_accuracy: 0.9898 - val_loss: 0.0318 - val_sparse_categorical_accuracy: 0.9899
Epoch 3/5
300/300 [==============================] - 6s 21ms/step - loss: 0.0199 - sparse_categorical_accuracy: 0.9935 - val_loss: 0.0397 - val_sparse_categorical_accuracy: 0.9866
Epoch 4/5
300/300 [==============================] - 6s 21ms/step - loss: 0.0109 - sparse_categorical_accuracy: 0.9964 - val_loss: 0.0436 - val_sparse_categorical_accuracy: 0.9892
Epoch 5/5
300/300 [==============================] - 6s 21ms/step - loss: 0.0103 - sparse_categorical_accuracy: 0.9963 - val_loss: 0.0481 - val_sparse_categorical_accuracy: 0.9881
<keras.callbacks.History at 0x7f0d485602e8>

برای کاهش سربار پایتون و به حداکثر رساندن عملکرد TPU خود، آرگومان - steps_per_execution - را به Model.compile کنید. در این مثال، توان عملیاتی را حدود 50٪ افزایش می دهد:

with strategy.scope():
  model = create_model()
  model.compile(optimizer='adam',
                # Anything between 2 and `steps_per_epoch` could help here.
                steps_per_execution = 50,
                loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                metrics=['sparse_categorical_accuracy'])

model.fit(train_dataset,
          epochs=5,
          steps_per_epoch=steps_per_epoch,
          validation_data=test_dataset,
          validation_steps=validation_steps)
Epoch 1/5
300/300 [==============================] - 12s 41ms/step - loss: 0.1515 - sparse_categorical_accuracy: 0.9537 - val_loss: 0.0416 - val_sparse_categorical_accuracy: 0.9863
Epoch 2/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0366 - sparse_categorical_accuracy: 0.9891 - val_loss: 0.0410 - val_sparse_categorical_accuracy: 0.9875
Epoch 3/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0191 - sparse_categorical_accuracy: 0.9938 - val_loss: 0.0432 - val_sparse_categorical_accuracy: 0.9865
Epoch 4/5
300/300 [==============================] - 3s 10ms/step - loss: 0.0141 - sparse_categorical_accuracy: 0.9951 - val_loss: 0.0447 - val_sparse_categorical_accuracy: 0.9875
Epoch 5/5
300/300 [==============================] - 3s 11ms/step - loss: 0.0093 - sparse_categorical_accuracy: 0.9968 - val_loss: 0.0426 - val_sparse_categorical_accuracy: 0.9884
<keras.callbacks.History at 0x7f0d0463cd68>

مدل را با استفاده از یک حلقه آموزشی سفارشی آموزش دهید

همچنین می توانید مدل خود را با استفاده از tf.function و tf.distribute به طور مستقیم ایجاد و آموزش دهید. می‌توانید از API strategy.experimental_distribute_datasets_from_function برای توزیع مجموعه داده‌ای که تابع مجموعه داده شده است استفاده کنید. توجه داشته باشید که در مثال زیر، اندازه دسته‌ای که به مجموعه داده ارسال می‌شود، به‌جای اندازه دسته کلی، اندازه دسته‌ای در هر نسخه است. برای کسب اطلاعات بیشتر، آموزش سفارشی با آموزش tf.distribute.Strategy را بررسی کنید.

ابتدا مدل، مجموعه داده ها و tf.functions را ایجاد کنید:

# Create the model, optimizer and metrics inside the strategy scope, so that the
# variables can be mirrored on each device.
with strategy.scope():
  model = create_model()
  optimizer = tf.keras.optimizers.Adam()
  training_loss = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
  training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      'training_accuracy', dtype=tf.float32)

# Calculate per replica batch size, and distribute the datasets on each TPU
# worker.
per_replica_batch_size = batch_size // strategy.num_replicas_in_sync

train_dataset = strategy.experimental_distribute_datasets_from_function(
    lambda _: get_dataset(per_replica_batch_size, is_training=True))

@tf.function
def train_step(iterator):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  strategy.run(step_fn, args=(next(iterator),))
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function
WARNING:tensorflow:From <ipython-input-1-5625c2a14441>:15: StrategyBase.experimental_distribute_datasets_from_function (from tensorflow.python.distribute.distribute_lib) is deprecated and will be removed in a future version.
Instructions for updating:
rename to distribute_datasets_from_function

سپس حلقه آموزشی را اجرا کنید:

steps_per_eval = 10000 // batch_size

train_iterator = iter(train_dataset)
for epoch in range(5):
  print('Epoch: {}/5'.format(epoch))

  for step in range(steps_per_epoch):
    train_step(train_iterator)
  print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
  training_loss.reset_states()
  training_accuracy.reset_states()
Epoch: 0/5
Current step: 300, training loss: 0.1339, accuracy: 95.79%
Epoch: 1/5
Current step: 600, training loss: 0.0333, accuracy: 98.91%
Epoch: 2/5
Current step: 900, training loss: 0.0176, accuracy: 99.43%
Epoch: 3/5
Current step: 1200, training loss: 0.0126, accuracy: 99.61%
Epoch: 4/5
Current step: 1500, training loss: 0.0122, accuracy: 99.61%

بهبود عملکرد با چند مرحله در داخل tf.function

می توانید با اجرای چند مرحله در یک tf.function عملکرد را بهبود بخشید. این امر با قرار دادن فراخوان strategy.run .run با یک tf.range در داخل tf.function به دست می آید و AutoGraph آن را به tf.while_loop در کارگر TPU تبدیل می کند.

با وجود بهبود عملکرد، این روش در مقایسه با اجرای یک مرحله در داخل tf.function دارد. اجرای چند مرحله در یک tf.function انعطاف کمتری دارد—شما نمی توانید با اشتیاق یا کدهای دلخواه پایتون را در مراحل اجرا کنید.

@tf.function
def train_multiple_steps(iterator, steps):
  """The step function for one training step."""

  def step_fn(inputs):
    """The computation to run on each TPU device."""
    images, labels = inputs
    with tf.GradientTape() as tape:
      logits = model(images, training=True)
      loss = tf.keras.losses.sparse_categorical_crossentropy(
          labels, logits, from_logits=True)
      loss = tf.nn.compute_average_loss(loss, global_batch_size=batch_size)
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(list(zip(grads, model.trainable_variables)))
    training_loss.update_state(loss * strategy.num_replicas_in_sync)
    training_accuracy.update_state(labels, logits)

  for _ in tf.range(steps):
    strategy.run(step_fn, args=(next(iterator),))

# Convert `steps_per_epoch` to `tf.Tensor` so the `tf.function` won't get 
# retraced if the value changes.
train_multiple_steps(train_iterator, tf.convert_to_tensor(steps_per_epoch))

print('Current step: {}, training loss: {}, accuracy: {}%'.format(
      optimizer.iterations.numpy(),
      round(float(training_loss.result()), 4),
      round(float(training_accuracy.result()) * 100, 2)))
Current step: 1800, training loss: 0.0081, accuracy: 99.74%

مراحل بعدی