Vòng đào tạo tùy chỉnh với Keras và MultiWorkerMirroredStrategy

Xem trên TensorFlow.org Chạy trong Google Colab Xem nguồn trên GitHub Tải xuống sổ ghi chép

Tổng quat

Hướng dẫn này trình bày việc đào tạo nhiều nhân viên với API vòng lặp huấn luyện tùy chỉnh, được phân phối qua MultiWorkerMirroredStrategy, vì vậy mô hình Keras được thiết kế để chạy trên một nhân viên có thể hoạt động liền mạch trên nhiều nhân viên với sự thay đổi mã tối thiểu.

Chúng tôi đang sử dụng các vòng đào tạo tùy chỉnh để đào tạo mô hình của mình vì chúng mang lại cho chúng tôi sự linh hoạt và khả năng kiểm soát tốt hơn trong quá trình đào tạo. Hơn nữa, nó dễ dàng hơn để gỡ lỗi mô hình và vòng lặp đào tạo. Thông tin chi tiết hơn có sẵn trong Viết một vòng đào tạo từ đầu .

Nếu bạn đang tìm cách sử dụng MultiWorkerMirroredStrategy với keras model.fit , hãy tham khảo hướng dẫn này để thay thế.

Hướng dẫn Đào tạo Phân tán trong TensorFlow có sẵn để có cái nhìn tổng quan về các chiến lược phân phối mà TensorFlow hỗ trợ cho những người quan tâm đến việc hiểu sâu hơn về các API tf.distribute.Strategy .

Thành lập

Đầu tiên, một số mặt hàng nhập khẩu cần thiết.

import json
import os
import sys

Trước khi nhập TensorFlow, hãy thực hiện một vài thay đổi đối với môi trường.

Tắt tất cả các GPU. Điều này ngăn ngừa các lỗi gây ra bởi tất cả các công nhân đang cố gắng sử dụng cùng một GPU. Đối với một ứng dụng thực tế, mỗi công nhân sẽ làm việc trên một máy khác nhau.

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

Đặt lại biến môi trường TF_CONFIG , bạn sẽ thấy thêm về điều này sau.

os.environ.pop('TF_CONFIG', None)

Đảm bảo rằng thư mục hiện tại nằm trên đường dẫn của python. Điều này cho phép sổ ghi chép nhập các tệp được ghi bởi %%writefile sau này.

if '.' not in sys.path:
  sys.path.insert(0, '.')

Bây giờ nhập TensorFlow.

import tensorflow as tf

Tập dữ liệu và định nghĩa mô hình

Tiếp theo, tạo tệp mnist.py với thiết lập mô hình và tập dữ liệu đơn giản. Tệp python này sẽ được sử dụng bởi worker-process trong hướng dẫn này:

%%writefile mnist.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the range [0, 255].
  # You need to convert them to float32 with values in the range [0, 1]
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000)
  return train_dataset

def dataset_fn(global_batch_size, input_context):
  batch_size = input_context.get_per_replica_batch_size(global_batch_size)
  dataset = mnist_dataset(batch_size)
  dataset = dataset.shard(input_context.num_input_pipelines,
                          input_context.input_pipeline_id)
  dataset = dataset.batch(batch_size)
  return dataset

def build_cnn_model():
  return tf.keras.Sequential([
      tf.keras.Input(shape=(28, 28)),
      tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
      tf.keras.layers.Conv2D(32, 3, activation='relu'),
      tf.keras.layers.Flatten(),
      tf.keras.layers.Dense(128, activation='relu'),
      tf.keras.layers.Dense(10)
  ])
Writing mnist.py

Cấu hình nhiều nhân viên

Bây giờ chúng ta hãy bước vào thế giới của đào tạo nhiều nhân viên. Trong TensorFlow, biến môi trường TF_CONFIG là bắt buộc để huấn luyện trên nhiều máy, mỗi máy có thể có một vai trò khác nhau. TF_CONFIG được sử dụng bên dưới, là một chuỗi JSON được sử dụng để chỉ định cấu hình cụm trên mỗi worker là một phần của cụm. Đây là phương pháp mặc định để chỉ định một cụm, sử dụng cluster_resolver.TFConfigClusterResolver , nhưng có các tùy chọn khác có sẵn trong mô-đun distribute.cluster_resolver phối.cluster_resolver.

Mô tả cụm của bạn

Đây là một cấu hình ví dụ:

tf_config = {
    'cluster': {
        'worker': ['localhost:12345', 'localhost:23456']
    },
    'task': {'type': 'worker', 'index': 0}
}

Đây là cùng một TF_CONFIG được tuần tự hóa thành một chuỗi JSON:

json.dumps(tf_config)
'{"cluster": {"worker": ["localhost:12345", "localhost:23456"]}, "task": {"type": "worker", "index": 0} }'

Có hai thành phần của TF_CONFIG : clustertask .

  • cluster giống nhau đối với tất cả công nhân và cung cấp thông tin về nhóm đào tạo, là một mệnh lệnh bao gồm các loại công việc khác nhau, chẳng hạn như worker . Trong quá trình đào tạo nhiều nhân viên với MultiWorkerMirroredStrategy , thường có một worker đảm nhận nhiều trách nhiệm hơn một chút như lưu điểm kiểm tra và viết tệp tóm tắt cho TensorBoard ngoài những gì một worker bình thường làm. Một công nhân như vậy được gọi là công nhân chief , và theo thông lệ, workerindex 0 được chỉ định làm worker chính (trên thực tế đây là cách tf.distribute.Strategy được triển khai).

  • task cung cấp thông tin của task hiện tại và khác nhau trên mỗi worker. Nó chỉ định typeindex của công nhân đó.

Trong ví dụ này, bạn đặt type nhiệm vụ thành "worker"index nhiệm vụ thành 0 . Máy này là công nhân đầu tiên sẽ được chỉ định làm công nhân chính và làm nhiều việc hơn các máy khác. Lưu ý rằng các máy khác cũng sẽ cần phải đặt biến môi trường TF_CONFIG và nó phải có cùng một cluster chính, nhưng type tác vụ hoặc index tác vụ khác nhau tùy thuộc vào vai trò của các máy đó.

Với mục đích minh họa, hướng dẫn này cho thấy cách một người có thể thiết lập một TF_CONFIG với 2 công nhân trên localhost . Trên thực tế, người dùng sẽ tạo nhiều worker trên các địa chỉ / cổng IP bên ngoài và đặt TF_CONFIG trên từng worker một cách thích hợp.

Trong ví dụ này, bạn sẽ sử dụng 2 công nhân, TF_CONFIG của công nhân đầu tiên được hiển thị ở trên. Đối với công nhân thứ hai, bạn sẽ đặt tf_config['task']['index']=1

Ở trên, tf_config chỉ là một biến cục bộ trong python. Để thực sự sử dụng nó để định cấu hình đào tạo, từ điển này cần được tuần tự hóa dưới dạng JSON và được đặt trong biến môi trường TF_CONFIG .

Biến môi trường và quy trình con trong sổ ghi chép

Các quy trình con kế thừa các biến môi trường từ cha của chúng. Vì vậy, nếu bạn đặt một biến môi trường trong quy trình jupyter notebook này:

os.environ['GREETINGS'] = 'Hello TensorFlow!'

Bạn có thể truy cập biến môi trường từ một quy trình con:

echo ${GREETINGS}
Hello TensorFlow!

Trong phần tiếp theo, bạn sẽ sử dụng điều này để chuyển TF_CONFIG đến các quy trình con của worker. Bạn sẽ không bao giờ thực sự khởi động công việc của mình theo cách này, nhưng nó đủ cho các mục đích của hướng dẫn này: Để minh họa một ví dụ tối thiểu về nhiều nhân viên.

MultiWorkerMirroredStrategy

Để đào tạo mô hình, hãy sử dụng một phiên bản của tf.distribute.MultiWorkerMirroredStrategy , tạo bản sao của tất cả các biến trong các lớp của mô hình trên mỗi thiết bị trên tất cả các worker. Hướng dẫn tf.distribute.Strategy có thêm chi tiết về chiến lược này.

strategy = tf.distribute.MultiWorkerMirroredStrategy()
2022-01-26 05:35:39.353025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
2022-01-26 05:35:39.353298: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Sử dụng tf.distribute.Strategy.scope để chỉ định rằng một chiến lược nên được sử dụng khi xây dựng mô hình của bạn. Điều này đặt bạn vào " bối cảnh sao chép chéo " cho chiến lược này, có nghĩa là chiến lược được đặt trong tầm kiểm soát của những thứ như vị trí thay đổi.

import mnist
with strategy.scope():
  # Model building needs to be within `strategy.scope()`.
  multi_worker_model = mnist.build_cnn_model()

Tự động phân chia dữ liệu của bạn trên các công nhân

Trong đào tạo nhiều nhân viên, không nhất thiết phải cần đến phân tách tập dữ liệu, tuy nhiên, nó cung cấp cho bạn ngữ nghĩa chính xác một lần, giúp quá trình đào tạo dễ tái tạo hơn, tức là đào tạo trên nhiều công nhân cũng giống như đào tạo trên một công nhân. Lưu ý: hiệu suất có thể bị ảnh hưởng trong một số trường hợp.

Xem: distribute_datasets_from_function

per_worker_batch_size = 64
num_workers = len(tf_config['cluster']['worker'])
global_batch_size = per_worker_batch_size * num_workers

with strategy.scope():
  multi_worker_dataset = strategy.distribute_datasets_from_function(
      lambda input_context: mnist.dataset_fn(global_batch_size, input_context))

Xác định vòng lặp đào tạo tùy chỉnh và đào tạo mô hình

Chỉ định một trình tối ưu hóa

with strategy.scope():
  # The creation of optimizer and train_accuracy will need to be in
  # `strategy.scope()` as well, since they create variables.
  optimizer = tf.keras.optimizers.RMSprop(learning_rate=0.001)
  train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
      name='train_accuracy')

Xác định một bước đào tạo với tf.function

@tf.function
def train_step(iterator):
  """Training step function."""

  def step_fn(inputs):
    """Per-Replica step function."""
    x, y = inputs
    with tf.GradientTape() as tape:
      predictions = multi_worker_model(x, training=True)
      per_batch_loss = tf.keras.losses.SparseCategoricalCrossentropy(
          from_logits=True,
          reduction=tf.keras.losses.Reduction.NONE)(y, predictions)
      loss = tf.nn.compute_average_loss(
          per_batch_loss, global_batch_size=global_batch_size)

    grads = tape.gradient(loss, multi_worker_model.trainable_variables)
    optimizer.apply_gradients(
        zip(grads, multi_worker_model.trainable_variables))
    train_accuracy.update_state(y, predictions)
    return loss

  per_replica_losses = strategy.run(step_fn, args=(next(iterator),))
  return strategy.reduce(
      tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)

Lưu và khôi phục điểm kiểm tra

Việc triển khai điểm kiểm tra trong Vòng lặp đào tạo tùy chỉnh yêu cầu người dùng xử lý nó thay vì sử dụng lệnh gọi lại keras. Nó cho phép bạn lưu trọng lượng của mô hình và khôi phục chúng mà không cần phải lưu toàn bộ mô hình.

from multiprocessing import util
checkpoint_dir = os.path.join(util.get_temp_dir(), 'ckpt')

def _is_chief(task_type, task_id, cluster_spec):
  return (task_type is None
          or task_type == 'chief'
          or (task_type == 'worker'
              and task_id == 0
              and "chief" not in cluster_spec.as_dict()))

def _get_temp_dir(dirpath, task_id):
  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id, cluster_spec):
  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id, cluster_spec):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

Tại đây, bạn sẽ tạo một tf.train.Checkpoint theo dõi mô hình, được quản lý bởi tf.train.CheckpointManager để chỉ có điểm kiểm tra mới nhất được giữ nguyên.

epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64), name='epoch')
step_in_epoch = tf.Variable(
    initial_value=tf.constant(0, dtype=tf.dtypes.int64),
    name='step_in_epoch')
task_type, task_id = (strategy.cluster_resolver.task_type,
                      strategy.cluster_resolver.task_id)
# We normally don't need to manually instantiate a ClusterSpec, but in this 
# illustrative example we did not set TF_CONFIG before initializing the
# strategy. See the next section for "real-world" usage.
cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])

checkpoint = tf.train.Checkpoint(
    model=multi_worker_model, epoch=epoch, step_in_epoch=step_in_epoch)

write_checkpoint_dir = write_filepath(checkpoint_dir, task_type, task_id,
                                      cluster_spec)
checkpoint_manager = tf.train.CheckpointManager(
    checkpoint, directory=write_checkpoint_dir, max_to_keep=1)

Bây giờ, khi bạn cần khôi phục, bạn có thể tìm thấy điểm kiểm tra mới nhất được lưu bằng chức năng tf.train.latest_checkpoint tiện lợi.

latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
  checkpoint.restore(latest_checkpoint)

Sau khi khôi phục điểm kiểm tra, bạn có thể tiếp tục đào tạo vòng đào tạo tùy chỉnh của mình.

num_epochs = 3
num_steps_per_epoch = 70

while epoch.numpy() < num_epochs:
  iterator = iter(multi_worker_dataset)
  total_loss = 0.0
  num_batches = 0

  while step_in_epoch.numpy() < num_steps_per_epoch:
    total_loss += train_step(iterator)
    num_batches += 1
    step_in_epoch.assign_add(1)

  train_loss = total_loss / num_batches
  print('Epoch: %d, accuracy: %f, train_loss: %f.'
                %(epoch.numpy(), train_accuracy.result(), train_loss))

  train_accuracy.reset_states()

  # Once the `CheckpointManager` is set up, you're now ready to save, and remove
  # the checkpoints non-chief workers saved.
  checkpoint_manager.save()
  if not _is_chief(task_type, task_id, cluster_spec):
    tf.io.gfile.rmtree(write_checkpoint_dir)

  epoch.assign_add(1)
  step_in_epoch.assign(0)
2022-01-26 05:35:40.200068: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.849107, train_loss: 0.491886.
Epoch: 1, accuracy: 0.937835, train_loss: 0.197650.
Epoch: 2, accuracy: 0.963170, train_loss: 0.129683.

Thiết lập mã đầy đủ trên công nhân

Để thực sự chạy với MultiWorkerMirroredStrategy , bạn sẽ cần chạy các quy trình công nhân và chuyển TF_CONFIG cho chúng.

Giống như tệp mnist.py viết trước đó, đây là tệp main.py chứa cùng một mã mà chúng tôi đã xem qua từng bước trước đó trong chuyên mục này, chúng tôi chỉ ghi nó vào một tệp để mỗi công nhân sẽ chạy nó:

Tệp: main.py

Writing main.py

Đào tạo và đánh giá

Thư mục hiện tại hiện chứa cả hai tệp Python:

ls *.py
main.py
mnist.py

Vì vậy, json-serialize TF_CONFIG và thêm nó vào các biến môi trường:

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Bây giờ, bạn có thể khởi chạy một quy trình công nhân sẽ chạy main.py và sử dụng TF_CONFIG :

# first kill any previous runs
%killbgscripts
All background processes were killed.
python main.py &> job_0.log

Có một số điều cần lưu ý về lệnh trên:

  1. Nó sử dụng %%bash là một "phép thuật" sổ ghi chép để chạy một số lệnh bash.
  2. Nó sử dụng cờ --bg để chạy quá trình bash trong nền, bởi vì công nhân này sẽ không kết thúc. Nó đợi tất cả các công nhân trước khi nó bắt đầu.

Quá trình công nhân nền sẽ không in đầu ra cho sổ tay này, vì vậy &> chuyển hướng đầu ra của nó thành một tệp, vì vậy bạn có thể xem điều gì đã xảy ra.

Vì vậy, hãy đợi vài giây để quá trình bắt đầu:

import time
time.sleep(20)

Bây giờ hãy xem những gì đã được xuất vào logfile của worker cho đến nay:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration

Dòng cuối cùng của tệp nhật ký sẽ có nội dung: Started server with target: grpc://localhost:12345 . Công nhân đầu tiên hiện đã sẵn sàng và đang đợi tất cả (các) công nhân khác sẵn sàng để tiếp tục.

Vì vậy, hãy cập nhật tf_config cho quy trình của worker thứ hai để nhận:

tf_config['task']['index'] = 1
os.environ['TF_CONFIG'] = json.dumps(tf_config)

Bây giờ khởi chạy công nhân thứ hai. Điều này sẽ bắt đầu đào tạo vì tất cả công nhân đều đang hoạt động (vì vậy không cần phải làm nền tảng cho quá trình này):

python main.py > /dev/null 2>&1

Bây giờ nếu bạn kiểm tra lại nhật ký được viết bởi công nhân đầu tiên, bạn sẽ thấy rằng nó đã tham gia đào tạo mô hình đó:

cat job_0.log
2022-01-26 05:35:49.225025: E tensorflow/stream_executor/cuda/cuda_driver.cc:271] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2022-01-26 05:35:49.225297: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 470.63.1 does not match DSO version 470.86.0 -- cannot find working devices in this configuration
2022-01-26 05:36:10.343173: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch: 0, accuracy: 0.832589, train_loss: 0.531260.
Epoch: 1, accuracy: 0.936161, train_loss: 0.214774.
Epoch: 2, accuracy: 0.958594, train_loss: 0.140772.
# Delete the `TF_CONFIG`, and kill any background tasks so they don't affect the next section.
os.environ.pop('TF_CONFIG', None)
%killbgscripts
All background processes were killed.

Đào tạo chuyên sâu cho nhiều công nhân

Hướng dẫn này đã trình bày quy trình làm việc Custom Training Loop của thiết lập nhiều nhân viên. Mô tả chi tiết về các chủ đề khác có sẵn trong model.fit's guide thiết lập nhiều nhân viên và áp dụng cho CTL.

Xem thêm

  1. Hướng dẫn Đào tạo Phân tán trong TensorFlow cung cấp tổng quan về các chiến lược phân phối có sẵn.
  2. Các mô hình chính thức , nhiều mô hình trong số đó có thể được định cấu hình để chạy nhiều chiến lược phân phối.
  3. Phần Hiệu suất trong hướng dẫn cung cấp thông tin về các chiến lược và công cụ khác mà bạn có thể sử dụng để tối ưu hóa hiệu suất của các mô hình TensorFlow của mình.