Перенос обучения ЦП / ГП с несколькими рабочими

Посмотреть на TensorFlow.org Запустить в Google Colab Посмотреть исходный код на GitHub Скачать блокнот

В этом руководстве показано, как перенести рабочий процесс распределенного обучения с несколькими работниками с TensorFlow 1 на TensorFlow 2.

Чтобы выполнить обучение нескольких сотрудников с использованием ЦП/ГП:

Настраивать

Начните с некоторых необходимых импортов и простого набора данных для демонстрационных целей:

# The notebook uses a dataset instance for `Model.fit` with
# `ParameterServerStrategy`, which depends on symbols in TF 2.7.
# Install a utility needed for this demonstration
!pip install portpicker

import tensorflow as tf
import tensorflow.compat.v1 as tf1
features = [[1., 1.5], [2., 2.5], [3., 3.5]]
labels = [[0.3], [0.5], [0.7]]
eval_features = [[4., 4.5], [5., 5.5], [6., 6.5]]
eval_labels = [[0.8], [0.9], [1.]]

Вам понадобится переменная среды конфигурации 'TF_CONFIG' для обучения на нескольких машинах в TensorFlow. Используйте 'TF_CONFIG' чтобы указать адреса 'cluster' и 'task' ». (Подробнее читайте в руководстве по Distributed_training .)

import json
import os

tf_config = {
    'cluster': {
        'chief': ['localhost:11111'],
        'worker': ['localhost:12345', 'localhost:23456', 'localhost:21212'],
        'ps': ['localhost:12121', 'localhost:13131'],
    },
    'task': {'type': 'chief', 'index': 0}
}

os.environ['TF_CONFIG'] = json.dumps(tf_config)

Используйте оператор del , чтобы удалить переменную (но в реальном обучении с несколькими работниками в TensorFlow 1 вам не придется этого делать):

del os.environ['TF_CONFIG']

TensorFlow 1: распределенное обучение нескольких сотрудников с помощью API tf.estimator

Следующий фрагмент кода демонстрирует канонический рабочий процесс обучения нескольких сотрудников в TF1: вы будете использовать tf.estimator.Estimator , tf.estimator.TrainSpec , tf.estimator.EvalSpec и tf.estimator.train_and_evaluate API для распределения тренировка:

def _input_fn():
  return tf1.data.Dataset.from_tensor_slices((features, labels)).batch(1)

def _eval_input_fn():
  return tf1.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).batch(1)

def _model_fn(features, labels, mode):
  logits = tf1.layers.Dense(1)(features)
  loss = tf1.losses.mean_squared_error(labels=labels, predictions=logits)
  optimizer = tf1.train.AdagradOptimizer(0.05)
  train_op = optimizer.minimize(loss, global_step=tf1.train.get_global_step())
  return tf1.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

estimator = tf1.estimator.Estimator(model_fn=_model_fn)
train_spec = tf1.estimator.TrainSpec(input_fn=_input_fn)
eval_spec = tf1.estimator.EvalSpec(input_fn=_eval_input_fn)
tf1.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
INFO:tensorflow:Using default config.
WARNING:tensorflow:Using temporary folder as model directory: /tmp/tmpvfb91q_5
INFO:tensorflow:Using config: {'_model_dir': '/tmp/tmpvfb91q_5', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}
INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:401: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.
WARNING:tensorflow:From /tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/training/adagrad.py:143: calling Constant.__init__ (from tensorflow.python.ops.init_ops) with dtype is deprecated and will be removed in a future version.
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.038075272, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 3...
INFO:tensorflow:Saving checkpoints for 3 into /tmp/tmpvfb91q_5/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 3...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-11-13T02:31:06
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Inference Time : 0.13630s
INFO:tensorflow:Finished evaluation at 2021-11-13-02:31:06
INFO:tensorflow:Saving dict for global step 3: global_step = 3, loss = 0.005215075
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 3: /tmp/tmpvfb91q_5/model.ckpt-3
INFO:tensorflow:Loss for final step: 0.061832994.
({'loss': 0.005215075, 'global_step': 3}, [])

TensorFlow 2: обучение нескольких сотрудников со стратегиями распределения

В TensorFlow 2 распределенное обучение нескольких рабочих с процессорами, графическими процессорами и TPU выполняется с помощью tf.distribute.Strategy s.

В следующем примере показано, как использовать две такие стратегии: tf.distribute.experimental.ParameterServerStrategy и tf.distribute.MultiWorkerMirroredStrategy , обе из которых предназначены для обучения CPU/GPU с несколькими рабочими процессами.

В ParameterServerStrategy используется координатор ( 'chief' ), что делает его более дружественным к среде в этом ноутбуке Colab. Здесь вы будете использовать некоторые утилиты для настройки вспомогательных элементов, необходимых для работоспособности: вы создадите внутрипроцессный кластер , где потоки используются для имитации серверов параметров ( 'ps' ) и рабочих процессов ( 'worker' ). . Дополнительные сведения об обучении сервера параметров см. в руководстве по обучению сервера параметров с помощью ParameterServerStrategy .

В этом примере сначала определите переменную среды 'TF_CONFIG' с помощью tf.distribute.cluster_resolver.TFConfigClusterResolver , чтобы предоставить информацию о кластере. Если вы используете систему управления кластером для своего распределенного обучения, проверьте, предоставляет ли она уже 'TF_CONFIG' для вас, и в этом случае вам не нужно явно устанавливать эту переменную среды. (Подробнее читайте в разделе «Настройка переменной среды 'TF_CONFIG' в руководстве « Распределенное обучение с помощью TensorFlow ».)

# Find ports that are available for the `'chief'` (the coordinator),
# `'worker'`s, and `'ps'` (parameter servers).
import portpicker

chief_port = portpicker.pick_unused_port()
worker_ports = [portpicker.pick_unused_port() for _ in range(3)]
ps_ports = [portpicker.pick_unused_port() for _ in range(2)]

# Dump the cluster information to `'TF_CONFIG'`.
tf_config = {
    'cluster': {
        'chief': ["localhost:%s" % chief_port],
        'worker': ["localhost:%s" % port for port in worker_ports],
        'ps':  ["localhost:%s" % port for port in ps_ports],
    },
    'task': {'type': 'chief', 'index': 0}
}
os.environ['TF_CONFIG'] = json.dumps(tf_config)

# Use a cluster resolver to bridge the information to the strategy created below.
cluster_resolver = tf.distribute.cluster_resolver.TFConfigClusterResolver()

Затем создайте tf.distribute.Server для рабочих процессов и серверов параметров один за другим:

# Workers need some inter_ops threads to work properly.
# This is only needed for this notebook to demo. Real servers
# should not need this.
worker_config = tf.compat.v1.ConfigProto()
worker_config.inter_op_parallelism_threads = 4

for i in range(3):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="worker",
      task_index=i,
      config=worker_config)

for i in range(2):
  tf.distribute.Server(
      cluster_resolver.cluster_spec(),
      job_name="ps",
      task_index=i)

В реальном распределенном обучении вместо того, чтобы запускать все tf.distribute.Server на координаторе, вы будете использовать несколько машин, и каждый из них, обозначенный как "worker" и "ps" (серверы параметров), будет запустите tf.distribute.Server . Дополнительные сведения см. в разделе « Кластеры в реальном мире » в учебном пособии по серверу параметров .

Когда все готово, создайте объект ParameterServerStrategy :

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)
INFO:tensorflow:`tf.distribute.experimental.ParameterServerStrategy` is initialized with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategyV2 is now connecting to cluster with cluster_spec: ClusterSpec({'chief': ['localhost:16660'], 'ps': ['localhost:15313', 'localhost:20369'], 'worker': ['localhost:21380', 'localhost:18699', 'localhost:19420']})
INFO:tensorflow:ParameterServerStrategy (CentralStorageStrategy if you are using a single machine) with compute_devices = ['/job:chief/replica:0/task:0/device:GPU:0'], variable_device = '/job:chief/replica:0/task:0/device:GPU:0'
INFO:tensorflow:Number of GPUs on workers: 1

После создания объекта стратегии определите модель, оптимизатор и другие переменные, а затем вызовите Model.compile в API Strategy.scope для распространения обучения. (Дополнительную информацию см. в документации по API Strategy.scope .)

Если вы предпочитаете настраивать обучение, например, определяя проходы вперед и назад, обратитесь к разделу «Обучение с пользовательским циклом обучения » в учебном пособии по серверу параметров для получения дополнительных сведений.

dataset = tf.data.Dataset.from_tensor_slices(
      (features, labels)).shuffle(10).repeat().batch(64)

eval_dataset = tf.data.Dataset.from_tensor_slices(
      (eval_features, eval_labels)).repeat().batch(1)

with strategy.scope():
  model = tf.keras.models.Sequential([tf.keras.layers.Dense(1)])
  optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.05)
  model.compile(optimizer, "mse")

model.fit(dataset, epochs=5, steps_per_epoch=10)
Epoch 1/5
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
/tmpfs/src/tf_docs_env/lib/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py:453: UserWarning: To make it possible to preserve tf.data options across serialization boundaries, their implementation has moved to be part of the TensorFlow graph. As a consequence, the options value is in general no longer known at graph construction time. Invoking this method in graph mode retains the legacy behavior of the original implementation, but note that the returned value might not reflect the actual value of the options.
  warnings.warn("To make it possible to preserve tf.data options across "
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
INFO:tensorflow:Reduce to /device:CPU:0 then broadcast to ('/replica:0/device:CPU:0',).
2021-11-13 02:31:09.110074: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.115349: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:09.117963: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:4"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 - 3s - loss: 7.4912 - 3s/epoch - 259ms/step
Epoch 2/5
10/10 - 0s - loss: 3.3420 - 43ms/epoch - 4ms/step
Epoch 3/5
10/10 - 0s - loss: 1.9022 - 44ms/epoch - 4ms/step
Epoch 4/5
10/10 - 0s - loss: 1.1536 - 42ms/epoch - 4ms/step
Epoch 5/5
10/10 - 0s - loss: 0.7208 - 43ms/epoch - 4ms/step
<keras.callbacks.History at 0x7f45d83f3a50>
model.evaluate(eval_dataset, steps=10, return_dict=True)
1/10 [==>...........................] - ETA: 11s - loss: 2.4114
2021-11-13 02:31:10.757780: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}

2021-11-13 02:31:10.910985: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
10/10 [==============================] - 2s 38ms/step - loss: 3.8431
2021-11-13 02:31:11.053772: W tensorflow/core/grappler/optimizers/data/auto_shard.cc:766] AUTO sharding policy will apply DATA sharding policy as it failed to apply FILE sharding policy because of the following reason: Found an unshardable source dataset: name: "TensorSliceDataset/_2"
op: "TensorSliceDataset"
input: "Placeholder/_0"
input: "Placeholder/_1"
attr {
  key: "Toutput_types"
  value {
    list {
      type: DT_FLOAT
      type: DT_FLOAT
    }
  }
}
attr {
  key: "_cardinality"
  value {
    i: 3
  }
}
attr {
  key: "is_files"
  value {
    b: false
  }
}
attr {
  key: "metadata"
  value {
    s: "\n\024TensorSliceDataset:8"
  }
}
attr {
  key: "output_shapes"
  value {
    list {
      shape {
        dim {
          size: 2
        }
      }
      shape {
        dim {
          size: 1
        }
      }
    }
  }
}
{'loss': 3.843122}

Разделители ( tf.distribute.experimental.partitioners )

ParameterServerStrategy в TensorFlow 2 поддерживает переменное разбиение и предлагает те же разделители, что и TensorFlow 1, с менее запутанными именами: — tf.compat.v1.variable_axis_size_partitioner -> tf.distribute.experimental.partitioners.MaxSizePartitioner : разделитель, который поддерживает осколки меньше максимального размера) . - tf.compat.v1.min_max_variable_partitioner -> tf.distribute.experimental.partitioners.MinSizePartitioner : разделитель, который выделяет минимальный размер для каждого сегмента. - tf.compat.v1.fixed_size_partitioner -> tf.distribute.experimental.partitioners.FixedShardsPartitioner : разделитель, который выделяет фиксированное количество осколков.

В качестве альтернативы вы можете использовать объект MultiWorkerMirroredStrategy :

# To clean up the `TF_CONFIG` used for `ParameterServerStrategy`.
del os.environ['TF_CONFIG']
strategy = tf.distribute.MultiWorkerMirroredStrategy()
WARNING:tensorflow:Collective ops is not configured at program startup. Some performance features may not be enabled.
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:GPU:0',), communication = CommunicationImplementation.AUTO

Вы можете заменить стратегию, использованную выше, объектом MultiWorkerMirroredStrategy для выполнения обучения с этой стратегией.

Как и в случае с API-интерфейсами tf.estimator , поскольку MultiWorkerMirroredStrategy является мультиклиентской стратегией, нет простого способа запустить распределенное обучение в этом блокноте Colab. Таким образом, замена приведенного выше кода этой стратегией приводит к локальному запуску. Обучение нескольких рабочих с Keras Model.fit/учебники по пользовательскому циклу обучения демонстрируют, как запустить обучение нескольких рабочих с настроенной переменной 'TF_CONFIG' с двумя рабочими на локальном хосте в Colab. На практике вы должны создать несколько рабочих процессов на внешних IP-адресах/портах и ​​использовать переменную 'TF_CONFIG' для указания конфигурации кластера для каждого рабочего процесса.

Следующие шаги

Чтобы узнать больше о распределенном обучении с несколькими работниками с помощью tf.distribute.experimental.ParameterServerStrategy и tf.distribute.MultiWorkerMirroredStrategy в TensorFlow 2, рассмотрите следующие ресурсы: